Compare commits

...

17 Commits

Author SHA1 Message Date
aarshkshah1992
5161f087fc fix compilation 2025-11-06 19:38:59 +04:00
aarshkshah1992
71050ab076 fix bazel 2025-11-06 19:10:19 +04:00
aarshkshah1992
614367ddcf fix lint 2025-11-05 22:26:55 +04:00
aarshkshah1992
3f7371445b fix test in sync 2025-11-05 22:23:26 +04:00
aarshkshah1992
a15a1ade17 fix test 2025-11-05 21:31:04 +04:00
aarshkshah1992
798376b1d7 fix bazel 2025-11-05 21:21:51 +04:00
aarshkshah1992
93271050bf more tests 2025-11-05 21:17:03 +04:00
aarshkshah1992
8dfbabc691 fork watcher test works 2025-11-05 14:45:35 +04:00
aarshkshah1992
af2522e5f0 fix schedule 2025-11-05 09:49:27 +04:00
aarshkshah1992
452d42bd10 fix test in sync 2025-11-05 09:21:34 +04:00
aarshkshah1992
3e985377ce fix test 2025-11-05 09:05:57 +04:00
aarshkshah1992
ab2e836d3f fix test 2025-11-04 21:11:45 +04:00
Aarsh Shah
14158bea9c Merge branch 'develop' into feat/use-topic-abstraction-for-gossipsub-and-refactor-fork-watcher 2025-11-04 12:02:25 -05:00
aarshkshah1992
e14590636f bazel gazelle 2025-11-04 20:08:41 +04:00
aarshkshah1992
ce3660d2e7 finish draft 2025-11-04 20:08:11 +04:00
aarshkshah1992
7853cb9db0 changelog fragment 2025-11-03 19:14:46 +04:00
aarshkshah1992
8cfeda1473 WIP gossipsub controller 2025-11-03 19:02:41 +04:00
25 changed files with 1771 additions and 452 deletions

View File

@@ -131,7 +131,8 @@ func (s *Service) internalBroadcastAttestation(ctx context.Context, subnet uint6
s.subnetLocker(subnet).Lock()
defer s.subnetLocker(subnet).Unlock()
if err := s.FindAndDialPeersWithSubnets(ctx, AttestationSubnetTopicFormat, forkDigest, minimumPeersPerSubnetForBroadcast, map[uint64]bool{subnet: true}); err != nil {
builder := func(idx uint64) string { return attestationToTopic(idx, forkDigest) + s.Encoding().ProtocolSuffix() }
if err := s.FindAndDialPeersWithSubnets(ctx, builder, minimumPeersPerSubnetForBroadcast, map[uint64]bool{subnet: true}); err != nil {
return errors.Wrap(err, "find peers with subnets")
}
@@ -187,7 +188,8 @@ func (s *Service) broadcastSyncCommittee(ctx context.Context, subnet uint64, sMs
if err := func() error {
s.subnetLocker(wrappedSubIdx).Lock()
defer s.subnetLocker(wrappedSubIdx).Unlock()
if err := s.FindAndDialPeersWithSubnets(ctx, SyncCommitteeSubnetTopicFormat, forkDigest, minimumPeersPerSubnetForBroadcast, map[uint64]bool{subnet: true}); err != nil {
builder := func(idx uint64) string { return syncCommitteeToTopic(idx, forkDigest) + s.Encoding().ProtocolSuffix() }
if err := s.FindAndDialPeersWithSubnets(ctx, builder, minimumPeersPerSubnetForBroadcast, map[uint64]bool{subnet: true}); err != nil {
return errors.Wrap(err, "find peers with subnets")
}
@@ -252,7 +254,8 @@ func (s *Service) internalBroadcastBlob(ctx context.Context, subnet uint64, blob
s.subnetLocker(wrappedSubIdx).Lock()
defer s.subnetLocker(wrappedSubIdx).Unlock()
if err := s.FindAndDialPeersWithSubnets(ctx, BlobSubnetTopicFormat, forkDigest, minimumPeersPerSubnetForBroadcast, map[uint64]bool{subnet: true}); err != nil {
builder := func(idx uint64) string { return blobSubnetToTopic(idx, forkDigest) + s.Encoding().ProtocolSuffix() }
if err := s.FindAndDialPeersWithSubnets(ctx, builder, minimumPeersPerSubnetForBroadcast, map[uint64]bool{subnet: true}); err != nil {
return errors.Wrap(err, "find peers with subnets")
}
@@ -392,7 +395,10 @@ func (s *Service) broadcastDataColumnSidecars(ctx context.Context, forkDigest [f
wrappedSubIdx := subnet + dataColumnSubnetVal
// Find peers if needed.
if err := s.findPeersIfNeeded(ctx, wrappedSubIdx, DataColumnSubnetTopicFormat, forkDigest, subnet); err != nil {
builder := func(idx uint64) string {
return dataColumnSubnetToTopic(idx, forkDigest) + s.Encoding().ProtocolSuffix()
}
if err := s.findPeersIfNeeded(ctx, wrappedSubIdx, builder, subnet); err != nil {
tracing.AnnotateError(span, err)
log.WithError(err).Error("Cannot find peers if needed")
return
@@ -487,8 +493,7 @@ func (s *Service) broadcastDataColumnSidecars(ctx context.Context, forkDigest [f
func (s *Service) findPeersIfNeeded(
ctx context.Context,
wrappedSubIdx uint64,
topicFormat string,
forkDigest [fieldparams.VersionLength]byte,
fullTopicForSubnet func(uint64) string,
subnet uint64,
) error {
// Sending a data column sidecar to only one peer is not ideal,
@@ -497,7 +502,7 @@ func (s *Service) findPeersIfNeeded(
defer s.subnetLocker(wrappedSubIdx).Unlock()
// No peers found, attempt to find peers with this subnet.
if err := s.FindAndDialPeersWithSubnets(ctx, topicFormat, forkDigest, minimumPeersPerSubnetForBroadcast, map[uint64]bool{subnet: true}); err != nil {
if err := s.FindAndDialPeersWithSubnets(ctx, fullTopicForSubnet, minimumPeersPerSubnetForBroadcast, map[uint64]bool{subnet: true}); err != nil {
return errors.Wrap(err, "find peers with subnet")
}

View File

@@ -5,7 +5,6 @@ import (
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/encoder"
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers"
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v6/consensus-types/interfaces"
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
@@ -101,7 +100,7 @@ type (
NodeID() enode.ID
DiscoveryAddresses() ([]multiaddr.Multiaddr, error)
RefreshPersistentSubnets()
FindAndDialPeersWithSubnets(ctx context.Context, topicFormat string, digest [fieldparams.VersionLength]byte, minimumPeersPerSubnet int, subnets map[uint64]bool) error
FindAndDialPeersWithSubnets(ctx context.Context, fullTopicForSubnet func(uint64) string, minimumPeersPerSubnet int, subnets map[uint64]bool) error
AddPingMethod(reqFunc func(ctx context.Context, id peer.ID) error)
}

View File

@@ -557,3 +557,15 @@ func (s *Service) downscorePeer(peerID peer.ID, reason string) {
newScore := s.Peers().Scorers().BadResponsesScorer().Increment(peerID)
log.WithFields(logrus.Fields{"peerID": peerID, "reason": reason, "newScore": newScore}).Debug("Downscore peer")
}
func AttestationSubnets(nodeID enode.ID, node *enode.Node, record *enr.Record) (map[uint64]bool, error) {
return attestationSubnets(record)
}
func SyncSubnets(nodeID enode.ID, node *enode.Node, record *enr.Record) (map[uint64]bool, error) {
return syncSubnets(record)
}
func DataColumnSubnets(nodeID enode.ID, node *enode.Node, record *enr.Record) (map[uint64]bool, error) {
return dataColumnSubnets(nodeID, record)
}

View File

@@ -13,7 +13,6 @@ import (
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/helpers"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas"
"github.com/OffchainLabs/prysm/v6/cmd/beacon-chain/flags"
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
"github.com/OffchainLabs/prysm/v6/consensus-types/wrapper"
@@ -84,8 +83,7 @@ func (s *Service) nodeFilter(topic string, indices map[uint64]int) (func(node *e
// In this case, the function returns an error.
func (s *Service) FindAndDialPeersWithSubnets(
ctx context.Context,
topicFormat string,
digest [fieldparams.VersionLength]byte,
fullTopicForSubnet func(uint64) string,
minimumPeersPerSubnet int,
subnets map[uint64]bool,
) error {
@@ -103,7 +101,7 @@ func (s *Service) FindAndDialPeersWithSubnets(
maxConcurrentDials = flags.Get().MaxConcurrentDials
}
defectiveSubnets := s.defectiveSubnets(topicFormat, digest, minimumPeersPerSubnet, subnets)
defectiveSubnets := s.defectiveSubnets(fullTopicForSubnet, minimumPeersPerSubnet, subnets)
for len(defectiveSubnets) > 0 {
// Stop the search/dialing loop if the context is canceled.
if err := ctx.Err(); err != nil {
@@ -114,7 +112,7 @@ func (s *Service) FindAndDialPeersWithSubnets(
ctx, cancel := context.WithTimeout(ctx, batchPeriod)
defer cancel()
peersToDial, err := s.findPeersWithSubnets(ctx, topicFormat, digest, minimumPeersPerSubnet, defectiveSubnets)
peersToDial, err := s.findPeersWithSubnets(ctx, fullTopicForSubnet, minimumPeersPerSubnet, defectiveSubnets)
if err != nil && !errors.Is(err, context.DeadlineExceeded) {
return nil, errors.Wrap(err, "find peers with subnets")
}
@@ -129,7 +127,7 @@ func (s *Service) FindAndDialPeersWithSubnets(
// Dial new peers in batches.
s.dialPeers(s.ctx, maxConcurrentDials, peersToDial)
defectiveSubnets = s.defectiveSubnets(topicFormat, digest, minimumPeersPerSubnet, subnets)
defectiveSubnets = s.defectiveSubnets(fullTopicForSubnet, minimumPeersPerSubnet, subnets)
}
return nil
@@ -158,8 +156,7 @@ func updateDefectiveSubnets(
// It returns new peers found during the search.
func (s *Service) findPeersWithSubnets(
ctx context.Context,
topicFormat string,
digest [fieldparams.VersionLength]byte,
fullTopicForSubnet func(uint64) string,
minimumPeersPerSubnet int,
defectiveSubnetsOrigin map[uint64]int,
) ([]*enode.Node, error) {
@@ -181,7 +178,13 @@ func (s *Service) findPeersWithSubnets(
}()
// Retrieve the filter function that will be used to filter nodes based on the defective subnets.
filter, err := s.nodeFilter(topicFormat, defectiveSubnets)
// Use any subnet's full topic to infer the family type from the topic string.
var sampleTopic string
for k := range defectiveSubnets {
sampleTopic = fullTopicForSubnet(k)
break
}
filter, err := s.nodeFilter(sampleTopic, defectiveSubnets)
if err != nil {
return nil, errors.Wrap(err, "node filter")
}
@@ -225,8 +228,8 @@ func (s *Service) findPeersWithSubnets(
nodeSubnets, err := filter(node)
if err != nil {
log.WithError(err).WithFields(logrus.Fields{
"nodeID": node.ID(),
"topicFormat": topicFormat,
"nodeID": node.ID(),
"topic": sampleTopic,
}).Debug("Could not get needed subnets from peer")
continue
@@ -241,7 +244,7 @@ func (s *Service) findPeersWithSubnets(
nodeByNodeID[node.ID()] = node
updateDefectiveSubnets(nodeSubnets, defectiveSubnets)
filter, err = s.nodeFilter(topicFormat, defectiveSubnets)
filter, err = s.nodeFilter(sampleTopic, defectiveSubnets)
if err != nil {
return nil, errors.Wrap(err, "node filter")
}
@@ -258,14 +261,13 @@ func (s *Service) findPeersWithSubnets(
// defectiveSubnets returns a map of subnets that have fewer than the minimum peer count.
func (s *Service) defectiveSubnets(
topicFormat string,
digest [fieldparams.VersionLength]byte,
fullTopicForSubnet func(uint64) string,
minimumPeersPerSubnet int,
subnets map[uint64]bool,
) map[uint64]int {
missingCountPerSubnet := make(map[uint64]int, len(subnets))
for subnet := range subnets {
topic := fmt.Sprintf(topicFormat, digest, subnet) + s.Encoding().ProtocolSuffix()
topic := fullTopicForSubnet(subnet)
peers := s.pubsub.ListPeers(topic)
peerCount := len(peers)
if peerCount < minimumPeersPerSubnet {

View File

@@ -168,16 +168,19 @@ func TestStartDiscV5_FindAndDialPeersWithSubnet(t *testing.T) {
}()
subnets := map[uint64]bool{1: true, 2: true, 3: true}
defectiveSubnets := service.defectiveSubnets(AttestationSubnetTopicFormat, bootNodeForkDigest, minimumPeersPerSubnet, subnets)
builder := func(idx uint64) string {
return fmt.Sprintf(AttestationSubnetTopicFormat, bootNodeForkDigest, idx) + service.Encoding().ProtocolSuffix()
}
defectiveSubnets := service.defectiveSubnets(builder, minimumPeersPerSubnet, subnets)
require.Equal(t, subnetCount, len(defectiveSubnets))
ctxWithTimeOut, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
err = service.FindAndDialPeersWithSubnets(ctxWithTimeOut, AttestationSubnetTopicFormat, bootNodeForkDigest, minimumPeersPerSubnet, subnets)
err = service.FindAndDialPeersWithSubnets(ctxWithTimeOut, builder, minimumPeersPerSubnet, subnets)
require.NoError(t, err)
defectiveSubnets = service.defectiveSubnets(AttestationSubnetTopicFormat, bootNodeForkDigest, minimumPeersPerSubnet, subnets)
defectiveSubnets = service.defectiveSubnets(builder, minimumPeersPerSubnet, subnets)
require.Equal(t, 0, len(defectiveSubnets))
}
@@ -762,10 +765,12 @@ func TestFindPeersWithSubnets_NodeDeduplication(t *testing.T) {
ctxWithTimeout, cancel := context.WithTimeout(ctx, 100*time.Millisecond)
defer cancel()
builder := func(idx uint64) string {
return fmt.Sprintf(AttestationSubnetTopicFormat, digest, idx) + s.Encoding().ProtocolSuffix()
}
result, err := s.findPeersWithSubnets(
ctxWithTimeout,
AttestationSubnetTopicFormat,
digest,
builder,
1,
tt.defectiveSubnets,
)
@@ -982,10 +987,12 @@ func TestFindPeersWithSubnets_FilterPeerRemoval(t *testing.T) {
ctxWithTimeout, cancel := context.WithTimeout(ctx, 100*time.Millisecond)
defer cancel()
builder := func(idx uint64) string {
return fmt.Sprintf(AttestationSubnetTopicFormat, digest, idx) + s.Encoding().ProtocolSuffix()
}
result, err := s.findPeersWithSubnets(
ctxWithTimeout,
AttestationSubnetTopicFormat,
digest,
builder,
1,
tt.defectiveSubnets,
)
@@ -1105,10 +1112,12 @@ func TestFindPeersWithSubnets_received_bad_existing_node(t *testing.T) {
ctxWithTimeout, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()
builder := func(idx uint64) string {
return fmt.Sprintf(AttestationSubnetTopicFormat, digest, idx) + service.Encoding().ProtocolSuffix()
}
result, err := service.findPeersWithSubnets(
ctxWithTimeout,
AttestationSubnetTopicFormat,
digest,
builder,
1,
map[uint64]int{1: 2}, // Need 2 peers for subnet 1
)

View File

@@ -23,7 +23,6 @@ go_library(
"//beacon-chain/p2p/encoder:go_default_library",
"//beacon-chain/p2p/peers:go_default_library",
"//beacon-chain/p2p/peers/scorers:go_default_library",
"//config/fieldparams:go_default_library",
"//config/params:go_default_library",
"//consensus-types/blocks:go_default_library",
"//consensus-types/interfaces:go_default_library",

View File

@@ -5,7 +5,6 @@ import (
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/encoder"
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers"
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v6/consensus-types/interfaces"
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
@@ -71,7 +70,7 @@ func (*FakeP2P) DiscoveryAddresses() ([]multiaddr.Multiaddr, error) {
}
// FindAndDialPeersWithSubnets mocks the p2p func.
func (*FakeP2P) FindAndDialPeersWithSubnets(ctx context.Context, topicFormat string, digest [fieldparams.VersionLength]byte, minimumPeersPerSubnet int, subnets map[uint64]bool) error {
func (*FakeP2P) FindAndDialPeersWithSubnets(ctx context.Context, fullTopicForSubnet func(uint64) string, minimumPeersPerSubnet int, subnets map[uint64]bool) error {
return nil
}

View File

@@ -4,7 +4,6 @@ import (
"context"
"errors"
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/libp2p/go-libp2p/core/host"
@@ -58,7 +57,7 @@ func (m *MockPeerManager) DiscoveryAddresses() ([]multiaddr.Multiaddr, error) {
func (*MockPeerManager) RefreshPersistentSubnets() {}
// FindAndDialPeersWithSubnet .
func (*MockPeerManager) FindAndDialPeersWithSubnets(ctx context.Context, topicFormat string, digest [fieldparams.VersionLength]byte, minimumPeersPerSubnet int, subnets map[uint64]bool) error {
func (*MockPeerManager) FindAndDialPeersWithSubnets(ctx context.Context, fullTopicForSubnet func(uint64) string, minimumPeersPerSubnet int, subnets map[uint64]bool) error {
return nil
}

View File

@@ -15,7 +15,6 @@ import (
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/encoder"
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers"
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers/scorers"
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v6/consensus-types/interfaces"
@@ -183,11 +182,7 @@ func (p *TestP2P) ReceivePubSub(topic string, msg proto.Message) {
if _, err := p.Encoding().EncodeGossip(buf, castedMsg); err != nil {
p.t.Fatalf("Failed to encode message: %v", err)
}
digest, err := p.ForkDigest()
if err != nil {
p.t.Fatal(err)
}
topicHandle, err := ps.Join(fmt.Sprintf(topic, digest) + p.Encoding().ProtocolSuffix())
topicHandle, err := ps.Join(topic)
if err != nil {
p.t.Fatal(err)
}
@@ -279,6 +274,9 @@ func (p *TestP2P) SubscribeToTopic(topic string, opts ...pubsub.SubOpt) (*pubsub
// LeaveTopic closes topic and removes corresponding handler from list of joined topics.
// This method will return error if there are outstanding event handlers or subscriptions.
func (p *TestP2P) LeaveTopic(topic string) error {
p.mu.Lock()
defer p.mu.Unlock()
if t, ok := p.joinedTopics[topic]; ok {
if err := t.Close(); err != nil {
return err
@@ -420,7 +418,7 @@ func (p *TestP2P) Peers() *peers.Status {
}
// FindAndDialPeersWithSubnets mocks the p2p func.
func (*TestP2P) FindAndDialPeersWithSubnets(ctx context.Context, topicFormat string, digest [fieldparams.VersionLength]byte, minimumPeersPerSubnet int, subnets map[uint64]bool) error {
func (*TestP2P) FindAndDialPeersWithSubnets(ctx context.Context, fullTopicForSubnet func(uint64) string, minimumPeersPerSubnet int, subnets map[uint64]bool) error {
return nil
}

View File

@@ -15,6 +15,8 @@ go_library(
"error.go",
"fork_watcher.go",
"fuzz_exports.go", # keep
"gossipsub_controller.go",
"gossipsub_topic_family.go",
"log.go",
"metrics.go",
"once.go",
@@ -49,6 +51,9 @@ go_library(
"subscriber_sync_committee_message.go",
"subscriber_sync_contribution_proof.go",
"subscription_topic_handler.go",
"topic_families_dynamic_subnets.go",
"topic_families_static_subnets.go",
"topic_families_without_subnets.go",
"validate_aggregate_proof.go",
"validate_attester_slashing.go",
"validate_beacon_attestation.go",
@@ -135,6 +140,8 @@ go_library(
"//time:go_default_library",
"//time/slots:go_default_library",
"@com_github_ethereum_go_ethereum//common/hexutil:go_default_library",
"@com_github_ethereum_go_ethereum//p2p/enode:go_default_library",
"@com_github_ethereum_go_ethereum//p2p/enr:go_default_library",
"@com_github_hashicorp_golang_lru//:go_default_library",
"@com_github_libp2p_go_libp2p//core:go_default_library",
"@com_github_libp2p_go_libp2p//core/host:go_default_library",
@@ -172,6 +179,8 @@ go_test(
"decode_pubsub_test.go",
"error_test.go",
"fork_watcher_test.go",
"gossipsub_controller_test.go",
"gossipsub_topic_family_test.go",
"kzg_batch_verifier_test.go",
"once_test.go",
"pending_attestations_queue_bucket_test.go",
@@ -283,6 +292,7 @@ go_test(
"@com_github_d4l3k_messagediff//:go_default_library",
"@com_github_ethereum_go_ethereum//common:go_default_library",
"@com_github_ethereum_go_ethereum//core/types:go_default_library",
"@com_github_ethereum_go_ethereum//p2p/enode:go_default_library",
"@com_github_ethereum_go_ethereum//p2p/enr:go_default_library",
"@com_github_golang_snappy//:go_default_library",
"@com_github_libp2p_go_libp2p//:go_default_library",

View File

@@ -1,7 +1,6 @@
package sync
import (
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
"github.com/OffchainLabs/prysm/v6/time/slots"
@@ -13,24 +12,17 @@ import (
// - We are subscribed to the correct gossipsub topics (for the current and upcoming epoch).
// - We have registered the correct RPC stream handlers (for the current and upcoming epoch).
// - We have cleaned up gossipsub topics and RPC stream handlers that are no longer needed.
func (s *Service) p2pHandlerControlLoop() {
// At startup, launch registration and peer discovery loops, and register rpc stream handlers.
startEntry := params.GetNetworkScheduleEntry(s.cfg.clock.CurrentEpoch())
s.registerSubscribers(startEntry)
func (s *Service) p2pRPCHandlerControlLoop() {
slotTicker := slots.NewSlotTicker(s.cfg.clock.GenesisTime(), params.BeaconConfig().SecondsPerSlot)
for {
select {
// In the event of a node restart, we will still end up subscribing to the correct
// topics during/after the fork epoch. This routine is to ensure correct
// subscriptions for nodes running before a fork epoch.
case <-slotTicker.C():
current := s.cfg.clock.CurrentEpoch()
if err := s.ensureRegistrationsForEpoch(current); err != nil {
if err := s.ensureRPCRegistrationsForEpoch(current); err != nil {
log.WithError(err).Error("Unable to check for fork in the next epoch")
continue
}
if err := s.ensureDeregistrationForEpoch(current); err != nil {
if err := s.ensureRPCDeregistrationForEpoch(current); err != nil {
log.WithError(err).Error("Unable to check for fork in the previous epoch")
continue
}
@@ -44,9 +36,8 @@ func (s *Service) p2pHandlerControlLoop() {
// ensureRegistrationsForEpoch ensures that gossip topic and RPC stream handler
// registrations are in place for the current and subsequent epoch.
func (s *Service) ensureRegistrationsForEpoch(epoch primitives.Epoch) error {
func (s *Service) ensureRPCRegistrationsForEpoch(epoch primitives.Epoch) error {
current := params.GetNetworkScheduleEntry(epoch)
s.registerSubscribers(current)
currentHandler, err := s.rpcHandlerByTopicFromFork(current.VersionEnum)
if err != nil {
@@ -62,7 +53,6 @@ func (s *Service) ensureRegistrationsForEpoch(epoch primitives.Epoch) error {
if current.Epoch == next.Epoch {
return nil // no fork in the next epoch
}
s.registerSubscribers(next)
if s.digestActionDone(next.ForkDigest, registerRpcOnce) {
return nil
@@ -84,7 +74,7 @@ func (s *Service) ensureRegistrationsForEpoch(epoch primitives.Epoch) error {
}
// ensureDeregistrationForEpoch deregisters appropriate gossip and RPC topic if there is a fork in the current epoch.
func (s *Service) ensureDeregistrationForEpoch(currentEpoch primitives.Epoch) error {
func (s *Service) ensureRPCDeregistrationForEpoch(currentEpoch primitives.Epoch) error {
current := params.GetNetworkScheduleEntry(currentEpoch)
// If we are still in our genesis fork version then exit early.
@@ -115,20 +105,5 @@ func (s *Service) ensureDeregistrationForEpoch(currentEpoch primitives.Epoch) er
}
}
// Unsubscribe from all gossip topics with the previous fork digest.
if s.digestActionDone(previous.ForkDigest, unregisterGossipOnce) {
return nil
}
for _, t := range s.subHandler.allTopics() {
retDigest, err := p2p.ExtractGossipDigest(t)
if err != nil {
log.WithError(err).Error("Could not retrieve digest")
continue
}
if retDigest == previous.ForkDigest {
s.unSubscribeFromTopic(t)
}
}
return nil
}

View File

@@ -2,8 +2,6 @@ package sync
import (
"context"
"fmt"
"sync"
"testing"
"time"
@@ -50,30 +48,6 @@ func testForkWatcherService(t *testing.T, current primitives.Epoch) *Service {
return r
}
func TestRegisterSubscriptions_Idempotent(t *testing.T) {
params.SetupTestConfigCleanup(t)
genesis.StoreEmbeddedDuringTest(t, params.BeaconConfig().ConfigName)
fulu := params.BeaconConfig().ElectraForkEpoch + 4096*2
params.BeaconConfig().FuluForkEpoch = fulu
params.BeaconConfig().InitializeForkSchedule()
current := fulu - 1
s := testForkWatcherService(t, current)
next := params.GetNetworkScheduleEntry(fulu)
wg := attachSpawner(s)
require.Equal(t, true, s.registerSubscribers(next))
done := make(chan struct{})
go func() { wg.Wait(); close(done) }()
select {
case <-time.After(5 * time.Second):
t.Fatal("timed out waiting for subscriptions to be registered")
case <-done:
}
// the goal of this callback is just to assert that spawn is never called.
s.subscriptionSpawner = func(func()) { t.Error("registration routines spawned twice for the same digest") }
require.NoError(t, s.ensureRegistrationsForEpoch(fulu))
}
func TestService_CheckForNextEpochFork(t *testing.T) {
closedChan := make(chan struct{})
close(closedChan)
@@ -103,7 +77,6 @@ func TestService_CheckForNextEpochFork(t *testing.T) {
epochAtRegistration: func(e primitives.Epoch) primitives.Epoch { return e - 1 },
nextForkEpoch: params.BeaconConfig().BellatrixForkEpoch,
checkRegistration: func(t *testing.T, s *Service) {
digest := params.ForkDigest(params.BeaconConfig().AltairForkEpoch)
rpcMap := make(map[string]bool)
for _, p := range s.cfg.p2p.Host().Mux().Protocols() {
rpcMap[string(p)] = true
@@ -111,8 +84,6 @@ func TestService_CheckForNextEpochFork(t *testing.T) {
assert.Equal(t, true, rpcMap[p2p.RPCBlocksByRangeTopicV2+s.cfg.p2p.Encoding().ProtocolSuffix()], "topic doesn't exist")
assert.Equal(t, true, rpcMap[p2p.RPCBlocksByRootTopicV2+s.cfg.p2p.Encoding().ProtocolSuffix()], "topic doesn't exist")
assert.Equal(t, true, rpcMap[p2p.RPCMetaDataTopicV2+s.cfg.p2p.Encoding().ProtocolSuffix()], "topic doesn't exist")
expected := fmt.Sprintf(p2p.SyncContributionAndProofSubnetTopicFormat+s.cfg.p2p.Encoding().ProtocolSuffix(), digest)
assert.Equal(t, true, s.subHandler.topicExists(expected), "subnet topic doesn't exist")
// TODO: we should check subcommittee indices here but we need to work with the committee cache to do it properly
/*
subIndices := mapFromCount(params.BeaconConfig().SyncCommitteeSubnetCount)
@@ -127,14 +98,10 @@ func TestService_CheckForNextEpochFork(t *testing.T) {
{
name: "capella fork in the next epoch",
checkRegistration: func(t *testing.T, s *Service) {
digest := params.ForkDigest(params.BeaconConfig().CapellaForkEpoch)
rpcMap := make(map[string]bool)
for _, p := range s.cfg.p2p.Host().Mux().Protocols() {
rpcMap[string(p)] = true
}
expected := fmt.Sprintf(p2p.BlsToExecutionChangeSubnetTopicFormat+s.cfg.p2p.Encoding().ProtocolSuffix(), digest)
assert.Equal(t, true, s.subHandler.topicExists(expected), "subnet topic doesn't exist")
},
forkEpoch: params.BeaconConfig().CapellaForkEpoch,
nextForkEpoch: params.BeaconConfig().DenebForkEpoch,
@@ -143,17 +110,10 @@ func TestService_CheckForNextEpochFork(t *testing.T) {
{
name: "deneb fork in the next epoch",
checkRegistration: func(t *testing.T, s *Service) {
digest := params.ForkDigest(params.BeaconConfig().DenebForkEpoch)
rpcMap := make(map[string]bool)
for _, p := range s.cfg.p2p.Host().Mux().Protocols() {
rpcMap[string(p)] = true
}
subIndices := mapFromCount(params.BeaconConfig().BlobsidecarSubnetCount)
for idx := range subIndices {
topic := fmt.Sprintf(p2p.BlobSubnetTopicFormat, digest, idx)
expected := topic + s.cfg.p2p.Encoding().ProtocolSuffix()
assert.Equal(t, true, s.subHandler.topicExists(expected), fmt.Sprintf("subnet topic %s doesn't exist", expected))
}
assert.Equal(t, true, rpcMap[p2p.RPCBlobSidecarsByRangeTopicV1+s.cfg.p2p.Encoding().ProtocolSuffix()], "topic doesn't exist")
assert.Equal(t, true, rpcMap[p2p.RPCBlobSidecarsByRootTopicV1+s.cfg.p2p.Encoding().ProtocolSuffix()], "topic doesn't exist")
},
@@ -162,16 +122,8 @@ func TestService_CheckForNextEpochFork(t *testing.T) {
epochAtRegistration: func(e primitives.Epoch) primitives.Epoch { return e - 1 },
},
{
name: "electra fork in the next epoch",
checkRegistration: func(t *testing.T, s *Service) {
digest := params.ForkDigest(params.BeaconConfig().ElectraForkEpoch)
subIndices := mapFromCount(params.BeaconConfig().BlobsidecarSubnetCountElectra)
for idx := range subIndices {
topic := fmt.Sprintf(p2p.BlobSubnetTopicFormat, digest, idx)
expected := topic + s.cfg.p2p.Encoding().ProtocolSuffix()
assert.Equal(t, true, s.subHandler.topicExists(expected), fmt.Sprintf("subnet topic %s doesn't exist", expected))
}
},
name: "electra fork in the next epoch",
checkRegistration: func(t *testing.T, s *Service) {},
forkEpoch: params.BeaconConfig().ElectraForkEpoch,
nextForkEpoch: params.BeaconConfig().FuluForkEpoch,
epochAtRegistration: func(e primitives.Epoch) primitives.Epoch { return e - 1 },
@@ -194,54 +146,28 @@ func TestService_CheckForNextEpochFork(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
current := tt.epochAtRegistration(tt.forkEpoch)
s := testForkWatcherService(t, current)
wg := attachSpawner(s)
require.NoError(t, s.ensureRegistrationsForEpoch(s.cfg.clock.CurrentEpoch()))
wg.Wait()
require.NoError(t, s.ensureRPCRegistrationsForEpoch(s.cfg.clock.CurrentEpoch()))
tt.checkRegistration(t, s)
if current != tt.forkEpoch-1 {
return
}
// Ensure the topics were registered for the upcoming fork
digest := params.ForkDigest(tt.forkEpoch)
assert.Equal(t, true, s.subHandler.digestExists(digest))
// After this point we are checking deregistration, which doesn't apply if there isn't a higher
// nextForkEpoch.
if tt.forkEpoch >= tt.nextForkEpoch {
return
}
nextDigest := params.ForkDigest(tt.nextForkEpoch)
// Move the clock to just before the next fork epoch and ensure deregistration is correct
wg = attachSpawner(s)
s.cfg.clock = defaultClockWithTimeAtEpoch(tt.nextForkEpoch - 1)
require.NoError(t, s.ensureRegistrationsForEpoch(s.cfg.clock.CurrentEpoch()))
wg.Wait()
require.NoError(t, s.ensureRPCRegistrationsForEpoch(s.cfg.clock.CurrentEpoch()))
require.NoError(t, s.ensureDeregistrationForEpoch(tt.nextForkEpoch))
assert.Equal(t, true, s.subHandler.digestExists(digest))
// deregister as if it is the epoch after the next fork epoch
require.NoError(t, s.ensureDeregistrationForEpoch(tt.nextForkEpoch+1))
assert.Equal(t, false, s.subHandler.digestExists(digest))
assert.Equal(t, true, s.subHandler.digestExists(nextDigest))
require.NoError(t, s.ensureRPCDeregistrationForEpoch(tt.nextForkEpoch))
})
}
}
func attachSpawner(s *Service) *sync.WaitGroup {
wg := new(sync.WaitGroup)
s.subscriptionSpawner = func(f func()) {
wg.Add(1)
go func() {
defer wg.Done()
f()
}()
}
return wg
}
// oneEpoch returns the duration of one epoch.
func oneEpoch() time.Duration {
return time.Duration(params.BeaconConfig().SlotsPerEpoch.Mul(params.BeaconConfig().SecondsPerSlot)) * time.Second

View File

@@ -0,0 +1,144 @@
package sync
import (
"context"
"fmt"
"sync"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
"github.com/OffchainLabs/prysm/v6/time/slots"
"github.com/sirupsen/logrus"
)
type topicFamilyKey struct {
topicName string
forkDigest [4]byte
}
func topicFamilyKeyFrom(tf GossipsubTopicFamily) topicFamilyKey {
return topicFamilyKey{topicName: fmt.Sprintf("%s", tf.Name()), forkDigest: tf.NetworkScheduleEntry().ForkDigest}
}
type GossipsubController struct {
ctx context.Context
cancel context.CancelFunc
syncService *Service
wg sync.WaitGroup
mu sync.RWMutex
activeTopicFamilies map[topicFamilyKey]GossipsubTopicFamily
}
func NewGossipsubController(ctx context.Context, s *Service) *GossipsubController {
ctx, cancel := context.WithCancel(ctx)
return &GossipsubController{
ctx: ctx,
cancel: cancel,
syncService: s,
activeTopicFamilies: make(map[topicFamilyKey]GossipsubTopicFamily),
}
}
func (g *GossipsubController) Start() {
currentEpoch := g.syncService.cfg.clock.CurrentEpoch()
if err := g.syncService.waitForInitialSync(g.ctx); err != nil {
log.WithError(err).Debug("Context cancelled while waiting for initial sync, not starting GossipsubController")
return
}
g.updateActiveTopicFamilies(currentEpoch)
g.wg.Go(func() { g.controlLoop() })
log.Info("GossipsubController started")
}
func (g *GossipsubController) controlLoop() {
slotTicker := slots.NewSlotTicker(g.syncService.cfg.clock.GenesisTime(), params.BeaconConfig().SecondsPerSlot)
defer slotTicker.Done()
for {
select {
case <-slotTicker.C():
currentEpoch := g.syncService.cfg.clock.CurrentEpoch()
g.updateActiveTopicFamilies(currentEpoch)
case <-g.ctx.Done():
return
}
}
}
func (g *GossipsubController) updateActiveTopicFamilies(currentEpoch primitives.Epoch) {
currentNSE := params.GetNetworkScheduleEntry(currentEpoch)
families := TopicFamiliesForEpoch(currentEpoch, g.syncService, currentNSE)
isForkBoundary, nextNSE := isNextEpochForkBoundary(currentEpoch)
if isForkBoundary {
families = append(families, TopicFamiliesForEpoch(nextNSE.Epoch, g.syncService, nextNSE)...)
}
g.mu.Lock()
defer g.mu.Unlock()
// register topic families for the current NSE -> this is idempotent
for _, family := range families {
key := topicFamilyKeyFrom(family)
if _, ok := g.activeTopicFamilies[key]; ok {
continue
}
g.activeTopicFamilies[key] = family
family.Subscribe()
log.WithFields(logrus.Fields{
"topicName": key.topicName,
"forkDigest": fmt.Sprintf("%#x", key.forkDigest),
"epoch": currentEpoch,
}).Info("Registered topic family")
}
// remove topic families for the previous NSE -> this is idempotent
if beyond, previous := isOneEpochBeyondForkBoundary(currentEpoch); beyond {
for key, family := range g.activeTopicFamilies {
if key.forkDigest == previous.ForkDigest {
family.Unsubscribe()
delete(g.activeTopicFamilies, key)
log.WithFields(logrus.Fields{
"topicName": key.topicName,
"forkDigest": fmt.Sprintf("%#x", key.forkDigest),
}).Info("Removed topic family")
}
}
}
}
func (g *GossipsubController) Stop() {
g.cancel()
g.wg.Wait()
}
func isNextEpochForkBoundary(currentEpoch primitives.Epoch) (bool, params.NetworkScheduleEntry) {
current := params.GetNetworkScheduleEntry(currentEpoch)
next := params.GetNetworkScheduleEntry(currentEpoch + 1)
if current.Epoch == next.Epoch {
return false, next // no fork in the next epoch
}
return true, next // there is a fork in the next epoch
}
func isOneEpochBeyondForkBoundary(currentEpoch primitives.Epoch) (bool, params.NetworkScheduleEntry) {
current := params.GetNetworkScheduleEntry(currentEpoch)
previous := params.GetNetworkScheduleEntry(current.Epoch - 1)
if current.Epoch == params.BeaconConfig().GenesisEpoch {
return false, previous
}
if currentEpoch < current.Epoch+1 {
return false, previous
}
return true, previous
}

View File

@@ -0,0 +1,182 @@
package sync
import (
"context"
"fmt"
"strings"
"testing"
"time"
"github.com/OffchainLabs/prysm/v6/async/abool"
mockChain "github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain/testing"
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p"
p2ptest "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/testing"
mockSync "github.com/OffchainLabs/prysm/v6/beacon-chain/sync/initial-sync/testing"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
"github.com/OffchainLabs/prysm/v6/genesis"
"github.com/OffchainLabs/prysm/v6/testing/assert"
)
func testGossipsubControllerService(t *testing.T, current primitives.Epoch) *Service {
closedChan := make(chan struct{})
close(closedChan)
peer2peer := p2ptest.NewTestP2P(t)
chainService := &mockChain.ChainService{
Genesis: genesis.Time(),
ValidatorsRoot: genesis.ValidatorsRoot(),
}
ctx, cancel := context.WithTimeout(t.Context(), 10*time.Millisecond)
r := &Service{
ctx: ctx,
cancel: cancel,
cfg: &config{
p2p: peer2peer,
chain: chainService,
clock: defaultClockWithTimeAtEpoch(current),
initialSync: &mockSync.Sync{IsSyncing: false},
},
chainStarted: abool.New(),
subHandler: newSubTopicHandler(),
initialSyncComplete: closedChan,
}
r.gossipsubController = NewGossipsubController(context.Background(), r)
return r
}
func TestGossipsubController_CheckForNextEpochForkSubscriptions(t *testing.T) {
closedChan := make(chan struct{})
close(closedChan)
params.SetupTestConfigCleanup(t)
genesis.StoreEmbeddedDuringTest(t, params.BeaconConfig().ConfigName)
params.BeaconConfig().FuluForkEpoch = params.BeaconConfig().ElectraForkEpoch + 4096*2
params.BeaconConfig().InitializeForkSchedule()
tests := []struct {
name string
svcCreator func(t *testing.T) *Service
checkRegistration func(t *testing.T, s *Service)
forkEpoch primitives.Epoch
epochAtRegistration func(primitives.Epoch) primitives.Epoch
nextForkEpoch primitives.Epoch
}{
{
name: "no fork in the next epoch",
forkEpoch: params.BeaconConfig().AltairForkEpoch,
epochAtRegistration: func(e primitives.Epoch) primitives.Epoch { return e - 2 },
nextForkEpoch: params.BeaconConfig().BellatrixForkEpoch,
checkRegistration: func(t *testing.T, s *Service) {},
},
{
name: "altair fork in the next epoch",
forkEpoch: params.BeaconConfig().AltairForkEpoch,
epochAtRegistration: func(e primitives.Epoch) primitives.Epoch { return e - 1 },
nextForkEpoch: params.BeaconConfig().BellatrixForkEpoch,
checkRegistration: func(t *testing.T, s *Service) {
digest := params.ForkDigest(params.BeaconConfig().AltairForkEpoch)
expected := fmt.Sprintf(p2p.SyncContributionAndProofSubnetTopicFormat+s.cfg.p2p.Encoding().ProtocolSuffix(), digest)
assert.Equal(t, true, s.subHandler.topicExists(expected), "subnet topic doesn't exist")
},
},
{
name: "capella fork in the next epoch",
checkRegistration: func(t *testing.T, s *Service) {
digest := params.ForkDigest(params.BeaconConfig().CapellaForkEpoch)
rpcMap := make(map[string]bool)
for _, p := range s.cfg.p2p.Host().Mux().Protocols() {
rpcMap[string(p)] = true
}
expected := fmt.Sprintf(p2p.BlsToExecutionChangeSubnetTopicFormat+s.cfg.p2p.Encoding().ProtocolSuffix(), digest)
assert.Equal(t, true, s.subHandler.topicExists(expected), "subnet topic doesn't exist")
},
forkEpoch: params.BeaconConfig().CapellaForkEpoch,
nextForkEpoch: params.BeaconConfig().DenebForkEpoch,
epochAtRegistration: func(e primitives.Epoch) primitives.Epoch { return e - 1 },
},
{
name: "deneb fork in the next epoch",
checkRegistration: func(t *testing.T, s *Service) {
digest := params.ForkDigest(params.BeaconConfig().DenebForkEpoch)
subIndices := mapFromCount(params.BeaconConfig().BlobsidecarSubnetCount)
for idx := range subIndices {
topic := fmt.Sprintf(p2p.BlobSubnetTopicFormat, digest, idx)
expected := topic + s.cfg.p2p.Encoding().ProtocolSuffix()
assert.Equal(t, true, s.subHandler.topicExists(expected), fmt.Sprintf("subnet topic %s doesn't exist", expected))
}
},
forkEpoch: params.BeaconConfig().DenebForkEpoch,
nextForkEpoch: params.BeaconConfig().ElectraForkEpoch,
epochAtRegistration: func(e primitives.Epoch) primitives.Epoch { return e - 1 },
},
{
name: "electra fork in the next epoch",
checkRegistration: func(t *testing.T, s *Service) {
digest := params.ForkDigest(params.BeaconConfig().ElectraForkEpoch)
subIndices := mapFromCount(params.BeaconConfig().BlobsidecarSubnetCountElectra)
for idx := range subIndices {
topic := fmt.Sprintf(p2p.BlobSubnetTopicFormat, digest, idx)
expected := topic + s.cfg.p2p.Encoding().ProtocolSuffix()
assert.Equal(t, true, s.subHandler.topicExists(expected), fmt.Sprintf("subnet topic %s doesn't exist", expected))
}
},
forkEpoch: params.BeaconConfig().ElectraForkEpoch,
nextForkEpoch: params.BeaconConfig().FuluForkEpoch,
epochAtRegistration: func(e primitives.Epoch) primitives.Epoch { return e - 1 },
},
{
name: "fulu fork in the next epoch; should not have blob topics",
checkRegistration: func(t *testing.T, s *Service) {
// Advance to two epochs after Fulu activation and assert no blob topics remain.
fulu := params.BeaconConfig().FuluForkEpoch
target := fulu + 2
s.cfg.clock = defaultClockWithTimeAtEpoch(target)
s.gossipsubController.updateActiveTopicFamilies(s.cfg.clock.CurrentEpoch())
for _, topic := range s.subHandler.allTopics() {
if strings.Contains(topic, "/"+p2p.GossipBlobSidecarMessage) {
t.Fatalf("blob topic still exists after Fulu+2: %s", topic)
}
}
},
forkEpoch: params.BeaconConfig().FuluForkEpoch,
nextForkEpoch: params.BeaconConfig().FuluForkEpoch,
epochAtRegistration: func(e primitives.Epoch) primitives.Epoch { return e - 1 },
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
current := tt.epochAtRegistration(tt.forkEpoch)
s := testGossipsubControllerService(t, current)
s.gossipsubController.updateActiveTopicFamilies(s.cfg.clock.CurrentEpoch())
tt.checkRegistration(t, s)
if current != tt.forkEpoch-1 {
return
}
// Ensure the topics were registered for the upcoming fork
digest := params.ForkDigest(tt.forkEpoch)
assert.Equal(t, true, s.subHandler.digestExists(digest))
// After this point we are checking deregistration, which doesn't apply if there isn't a higher
// nextForkEpoch.
if tt.forkEpoch >= tt.nextForkEpoch {
return
}
nextDigest := params.ForkDigest(tt.nextForkEpoch)
// Move the clock to just before the next fork epoch and ensure deregistration is correct
s.cfg.clock = defaultClockWithTimeAtEpoch(tt.nextForkEpoch - 1)
s.gossipsubController.updateActiveTopicFamilies(s.cfg.clock.CurrentEpoch())
s.gossipsubController.updateActiveTopicFamilies(tt.nextForkEpoch)
assert.Equal(t, true, s.subHandler.digestExists(digest))
// deregister as if it is the epoch after the next fork epoch
s.gossipsubController.updateActiveTopicFamilies(tt.nextForkEpoch + 1)
assert.Equal(t, false, s.subHandler.digestExists(digest))
assert.Equal(t, true, s.subHandler.digestExists(nextDigest))
})
}
}

View File

@@ -0,0 +1,152 @@
package sync
import (
"context"
"github.com/OffchainLabs/prysm/v6/config/features"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
"github.com/ethereum/go-ethereum/p2p/enode"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/peer"
"google.golang.org/protobuf/proto"
)
// wrappedVal represents a gossip validator which also returns an error along with the result.
type wrappedVal func(context.Context, peer.ID, *pubsub.Message) (pubsub.ValidationResult, error)
// subHandler represents handler for a given subscription.
type subHandler func(context.Context, proto.Message) error
// noopHandler is used for subscriptions that do not require anything to be done.
var noopHandler subHandler = func(ctx context.Context, msg proto.Message) error {
return nil
}
type baseGossipsubTopicFamily struct {
syncService *Service
protocolSuffix string
nse params.NetworkScheduleEntry
}
func (b *baseGossipsubTopicFamily) NetworkScheduleEntry() params.NetworkScheduleEntry {
return b.nse
}
type GossipsubTopicFamily interface {
Name() string
Validator() wrappedVal
Handler() subHandler
NetworkScheduleEntry() params.NetworkScheduleEntry
Subscribe()
Unsubscribe()
}
type GossipsubTopicFamilyWithoutDynamicSubnets interface {
GossipsubTopicFamily
GetFullTopicString() string
}
type GossipsubTopicFamilyWithDynamicSubnets interface {
GossipsubTopicFamily
GetFullTopicString(subnet uint64) string
GetSubnetsToJoin(slot primitives.Slot) map[uint64]bool
GetSubnetsForBroadcast(slot primitives.Slot) map[uint64]bool
GetTopicsForNode(node *enode.Node) ([]string, error)
}
type topicFamilyEntry struct {
activationEpoch primitives.Epoch
deactivationEpoch *primitives.Epoch // optional; inactive at >= deactivationEpoch
factory func(s *Service, nse params.NetworkScheduleEntry) []GossipsubTopicFamily
}
func topicFamilySchedule() []topicFamilyEntry {
cfg := params.BeaconConfig()
return []topicFamilyEntry{
// Genesis topic families
{
activationEpoch: cfg.GenesisEpoch,
factory: func(s *Service, nse params.NetworkScheduleEntry) []GossipsubTopicFamily {
return []GossipsubTopicFamily{
NewBlockTopicFamily(s, nse),
NewAggregateAndProofTopicFamily(s, nse),
NewVoluntaryExitTopicFamily(s, nse),
NewProposerSlashingTopicFamily(s, nse),
NewAttesterSlashingTopicFamily(s, nse),
NewAttestationTopicFamily(s, nse),
}
},
},
// Altair topic families
{
activationEpoch: cfg.AltairForkEpoch,
factory: func(s *Service, nse params.NetworkScheduleEntry) []GossipsubTopicFamily {
families := []GossipsubTopicFamily{
NewSyncContributionAndProofTopicFamily(s, nse),
NewSyncCommitteeTopicFamily(s, nse),
}
if features.Get().EnableLightClient {
families = append(families,
NewLightClientOptimisticUpdateTopicFamily(s, nse),
NewLightClientFinalityUpdateTopicFamily(s, nse),
)
}
return families
},
},
// Capella topic families
{
activationEpoch: cfg.CapellaForkEpoch,
factory: func(s *Service, nse params.NetworkScheduleEntry) []GossipsubTopicFamily {
return []GossipsubTopicFamily{NewBlsToExecutionChangeTopicFamily(s, nse)}
},
},
// Blob topic families (static per-subnet) in Deneb and Electra forks (removed in Fulu)
{
activationEpoch: cfg.DenebForkEpoch,
deactivationEpoch: func() *primitives.Epoch { e := cfg.ElectraForkEpoch; return &e }(),
factory: func(s *Service, nse params.NetworkScheduleEntry) []GossipsubTopicFamily {
count := cfg.BlobsidecarSubnetCount
families := make([]GossipsubTopicFamily, 0, count)
for i := uint64(0); i < count; i++ {
families = append(families, NewBlobTopicFamily(s, nse, i))
}
return families
},
},
{
activationEpoch: cfg.ElectraForkEpoch,
deactivationEpoch: func() *primitives.Epoch { e := cfg.FuluForkEpoch; return &e }(),
factory: func(s *Service, nse params.NetworkScheduleEntry) []GossipsubTopicFamily {
count := cfg.BlobsidecarSubnetCountElectra
families := make([]GossipsubTopicFamily, 0, count)
for i := uint64(0); i < count; i++ {
families = append(families, NewBlobTopicFamily(s, nse, i))
}
return families
},
},
// Fulu data column topic family
{
activationEpoch: cfg.FuluForkEpoch,
factory: func(s *Service, nse params.NetworkScheduleEntry) []GossipsubTopicFamily {
return []GossipsubTopicFamily{NewDataColumnTopicFamily(s, nse)}
},
},
}
}
func TopicFamiliesForEpoch(epoch primitives.Epoch, s *Service, nse params.NetworkScheduleEntry) []GossipsubTopicFamily {
var activeFamilies []GossipsubTopicFamily
for _, entry := range topicFamilySchedule() {
if epoch < entry.activationEpoch {
continue
}
if entry.deactivationEpoch != nil && epoch >= *entry.deactivationEpoch {
continue
}
activeFamilies = append(activeFamilies, entry.factory(s, nse)...)
}
return activeFamilies
}

View File

@@ -0,0 +1,310 @@
package sync
import (
"context"
"testing"
p2ptest "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/testing"
"github.com/OffchainLabs/prysm/v6/config/features"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
"github.com/OffchainLabs/prysm/v6/testing/assert"
)
// createMinimalService creates a minimal Service instance for testing
func createMinimalService(t *testing.T) *Service {
p2pService := p2ptest.NewTestP2P(t)
return &Service{
cfg: &config{
p2p: p2pService,
},
ctx: context.Background(),
}
}
func TestTopicFamiliesForEpoch(t *testing.T) {
// Define test epochs
const (
genesisEpoch = primitives.Epoch(0)
altairEpoch = primitives.Epoch(100)
bellatrixEpoch = primitives.Epoch(200)
capellaEpoch = primitives.Epoch(300)
denebEpoch = primitives.Epoch(400)
electraEpoch = primitives.Epoch(500)
fuluEpoch = primitives.Epoch(600)
)
// Define topic families for each fork
// These names must match what's returned by the Name() method of each topic family
genesisFamilies := []string{
"BlockTopicFamily",
"AggregateAndProofTopicFamily",
"VoluntaryExitTopicFamily",
"ProposerSlashingTopicFamily",
"AttesterSlashingTopicFamily",
"AttestationTopicFamily",
}
altairFamilies := []string{
"SyncContributionAndProofTopicFamily",
"SyncCommitteeTopicFamily",
}
altairLightClientFamilies := []string{
"LightClientOptimisticUpdateTopicFamily",
"LightClientFinalityUpdateTopicFamily",
}
capellaFamilies := []string{
"BlsToExecutionChangeTopicFamily",
}
denebBlobFamilies := []string{
"BlobTopicFamily-0",
"BlobTopicFamily-1",
"BlobTopicFamily-2",
"BlobTopicFamily-3",
"BlobTopicFamily-4",
"BlobTopicFamily-5",
}
electraBlobFamilies := append(append([]string{}, denebBlobFamilies...), "BlobTopicFamily-6", "BlobTopicFamily-7")
fuluFamilies := []string{
"DataColumnTopicFamily",
}
// Helper function to combine fork families
combineForks := func(forkSets ...[]string) []string {
var combined []string
for _, forkSet := range forkSets {
combined = append(combined, forkSet...)
}
return combined
}
tests := []struct {
name string
epoch primitives.Epoch
setupConfig func()
enableLightClient bool
expectedFamilies []string
}{
{
name: "epoch before any fork activation should return empty",
epoch: primitives.Epoch(0),
setupConfig: func() {
config := params.BeaconConfig().Copy()
// Set all fork epochs to future epochs
config.GenesisEpoch = primitives.Epoch(1000)
config.AltairForkEpoch = primitives.Epoch(2000)
config.BellatrixForkEpoch = primitives.Epoch(3000)
config.CapellaForkEpoch = primitives.Epoch(4000)
config.DenebForkEpoch = primitives.Epoch(5000)
config.ElectraForkEpoch = primitives.Epoch(6000)
config.FuluForkEpoch = primitives.Epoch(7000)
params.OverrideBeaconConfig(config)
},
expectedFamilies: []string{},
},
{
name: "epoch at genesis should return genesis topic families",
epoch: genesisEpoch,
setupConfig: func() {
config := params.BeaconConfig().Copy()
config.GenesisEpoch = genesisEpoch
config.AltairForkEpoch = altairEpoch
config.BellatrixForkEpoch = bellatrixEpoch
config.CapellaForkEpoch = capellaEpoch
config.DenebForkEpoch = denebEpoch
config.ElectraForkEpoch = electraEpoch
config.FuluForkEpoch = fuluEpoch
params.OverrideBeaconConfig(config)
},
expectedFamilies: genesisFamilies,
},
{
name: "epoch at Altair without light client should have genesis + Altair families",
epoch: altairEpoch,
enableLightClient: false,
setupConfig: func() {
config := params.BeaconConfig().Copy()
config.GenesisEpoch = genesisEpoch
config.AltairForkEpoch = altairEpoch
config.BellatrixForkEpoch = bellatrixEpoch
config.CapellaForkEpoch = capellaEpoch
config.DenebForkEpoch = denebEpoch
config.ElectraForkEpoch = electraEpoch
config.FuluForkEpoch = fuluEpoch
params.OverrideBeaconConfig(config)
},
expectedFamilies: combineForks(genesisFamilies, altairFamilies),
},
{
name: "epoch at Altair with light client enabled should include light client families",
epoch: altairEpoch,
enableLightClient: true,
setupConfig: func() {
config := params.BeaconConfig().Copy()
config.GenesisEpoch = genesisEpoch
config.AltairForkEpoch = altairEpoch
config.BellatrixForkEpoch = bellatrixEpoch
config.CapellaForkEpoch = capellaEpoch
config.DenebForkEpoch = denebEpoch
config.ElectraForkEpoch = electraEpoch
config.FuluForkEpoch = fuluEpoch
params.OverrideBeaconConfig(config)
},
expectedFamilies: combineForks(genesisFamilies, altairFamilies, altairLightClientFamilies),
},
{
name: "epoch at Capella should have genesis + Altair + Capella families",
epoch: capellaEpoch,
setupConfig: func() {
config := params.BeaconConfig().Copy()
config.GenesisEpoch = genesisEpoch
config.AltairForkEpoch = altairEpoch
config.BellatrixForkEpoch = bellatrixEpoch
config.CapellaForkEpoch = capellaEpoch
config.DenebForkEpoch = denebEpoch
config.ElectraForkEpoch = electraEpoch
config.FuluForkEpoch = fuluEpoch
params.OverrideBeaconConfig(config)
},
expectedFamilies: combineForks(genesisFamilies, altairFamilies, capellaFamilies),
},
{
name: "epoch at Deneb should include blob sidecars",
epoch: denebEpoch,
setupConfig: func() {
config := params.BeaconConfig().Copy()
config.GenesisEpoch = genesisEpoch
config.AltairForkEpoch = altairEpoch
config.BellatrixForkEpoch = bellatrixEpoch
config.CapellaForkEpoch = capellaEpoch
config.DenebForkEpoch = denebEpoch
config.ElectraForkEpoch = electraEpoch
config.FuluForkEpoch = fuluEpoch
config.BlobsidecarSubnetCount = 6 // Deneb has 6 blob subnets
params.OverrideBeaconConfig(config)
},
expectedFamilies: combineForks(genesisFamilies, altairFamilies, capellaFamilies, denebBlobFamilies),
},
{
name: "epoch at Electra should have Electra blobs not Deneb blobs",
epoch: electraEpoch,
setupConfig: func() {
config := params.BeaconConfig().Copy()
config.GenesisEpoch = genesisEpoch
config.AltairForkEpoch = altairEpoch
config.BellatrixForkEpoch = bellatrixEpoch
config.CapellaForkEpoch = capellaEpoch
config.DenebForkEpoch = denebEpoch
config.ElectraForkEpoch = electraEpoch
config.FuluForkEpoch = fuluEpoch
config.BlobsidecarSubnetCount = 6
config.BlobsidecarSubnetCountElectra = 8 // Electra has 8 blob subnets
params.OverrideBeaconConfig(config)
},
expectedFamilies: combineForks(genesisFamilies, altairFamilies, capellaFamilies, electraBlobFamilies),
},
{
name: "epoch at Fulu should have data columns not blobs",
epoch: fuluEpoch,
setupConfig: func() {
config := params.BeaconConfig().Copy()
config.GenesisEpoch = genesisEpoch
config.AltairForkEpoch = altairEpoch
config.BellatrixForkEpoch = bellatrixEpoch
config.CapellaForkEpoch = capellaEpoch
config.DenebForkEpoch = denebEpoch
config.ElectraForkEpoch = electraEpoch
config.FuluForkEpoch = fuluEpoch
config.BlobsidecarSubnetCount = 6
config.BlobsidecarSubnetCountElectra = 8
params.OverrideBeaconConfig(config)
},
expectedFamilies: combineForks(genesisFamilies, altairFamilies, capellaFamilies, fuluFamilies),
},
{
name: "epoch after Fulu should maintain Fulu families",
epoch: fuluEpoch + 100,
setupConfig: func() {
config := params.BeaconConfig().Copy()
config.GenesisEpoch = genesisEpoch
config.AltairForkEpoch = altairEpoch
config.BellatrixForkEpoch = bellatrixEpoch
config.CapellaForkEpoch = capellaEpoch
config.DenebForkEpoch = denebEpoch
config.ElectraForkEpoch = electraEpoch
config.FuluForkEpoch = fuluEpoch
config.BlobsidecarSubnetCount = 6
config.BlobsidecarSubnetCountElectra = 8
params.OverrideBeaconConfig(config)
},
expectedFamilies: combineForks(genesisFamilies, altairFamilies, capellaFamilies, fuluFamilies),
},
{
name: "edge case - epoch exactly at deactivation should not include deactivated family",
epoch: electraEpoch, // This deactivates Deneb blobs
setupConfig: func() {
config := params.BeaconConfig().Copy()
config.GenesisEpoch = genesisEpoch
config.AltairForkEpoch = altairEpoch
config.BellatrixForkEpoch = bellatrixEpoch
config.CapellaForkEpoch = capellaEpoch
config.DenebForkEpoch = denebEpoch
config.ElectraForkEpoch = electraEpoch
config.FuluForkEpoch = fuluEpoch
config.BlobsidecarSubnetCount = 6
config.BlobsidecarSubnetCountElectra = 8
params.OverrideBeaconConfig(config)
},
expectedFamilies: combineForks(genesisFamilies, altairFamilies, capellaFamilies, electraBlobFamilies),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
params.SetupTestConfigCleanup(t)
if tt.enableLightClient {
resetFlags := features.InitWithReset(&features.Flags{
EnableLightClient: true,
})
defer resetFlags()
}
tt.setupConfig()
service := createMinimalService(t)
families := TopicFamiliesForEpoch(tt.epoch, service, params.NetworkScheduleEntry{})
// Collect actual family names
actualFamilies := make([]string, 0, len(families))
for _, family := range families {
actualFamilies = append(actualFamilies, family.Name())
}
// Assert exact match - families should have exactly the expected families and nothing more
assert.Equal(t, len(tt.expectedFamilies), len(actualFamilies),
"Expected %d families but got %d", len(tt.expectedFamilies), len(actualFamilies))
// Create a map for efficient lookup
expectedMap := make(map[string]bool)
for _, expected := range tt.expectedFamilies {
expectedMap[expected] = true
}
// Check each actual family is expected
for _, actual := range actualFamilies {
if !expectedMap[actual] {
t.Errorf("Unexpected topic family found: %s", actual)
}
delete(expectedMap, actual) // Remove from map as we find it
}
// Check all expected families were found (anything left in map was missing)
for missing := range expectedMap {
t.Errorf("Expected topic family not found: %s", missing)
}
})
}
}

View File

@@ -329,6 +329,8 @@ func TestHandshakeHandlers_Roundtrip(t *testing.T) {
chainStarted: abool.New(),
subHandler: newSubTopicHandler(),
}
r.gossipsubController = NewGossipsubController(ctx, r)
markInitSyncComplete(t, r)
clock := startup.NewClockSynchronizer()
require.NoError(t, clock.SetClock(startup.NewClock(time.Now(), [32]byte{})))
@@ -945,6 +947,8 @@ func TestStatusRPCRequest_BadPeerHandshake(t *testing.T) {
chainStarted: abool.New(),
subHandler: newSubTopicHandler(),
}
r.gossipsubController = NewGossipsubController(ctx, r)
markInitSyncComplete(t, r)
clock := startup.NewClockSynchronizer()
require.NoError(t, clock.SetClock(startup.NewClock(time.Now(), [32]byte{})))

View File

@@ -181,7 +181,7 @@ type Service struct {
lcStore *lightClient.Store
dataColumnLogCh chan dataColumnLogEntry
digestActions perDigestSet
subscriptionSpawner func(func()) // see Service.spawn for details
gossipsubController *GossipsubController
}
// NewService initializes new regular sync service.
@@ -198,6 +198,7 @@ func NewService(ctx context.Context, opts ...Option) *Service {
dataColumnLogCh: make(chan dataColumnLogEntry, 1000),
reconstructionRandGen: rand.NewGenerator(),
}
r.gossipsubController = NewGossipsubController(ctx, r)
for _, opt := range opts {
if err := opt(r); err != nil {
@@ -326,6 +327,10 @@ func (s *Service) Stop() error {
for _, t := range s.cfg.p2p.PubSub().GetTopics() {
s.unSubscribeFromTopic(t)
}
// Stop the gossipsub controller.
s.gossipsubController.Stop()
return nil
}
@@ -405,7 +410,10 @@ func (s *Service) startDiscoveryAndSubscriptions() {
}
// Start the fork watcher.
go s.p2pHandlerControlLoop()
go s.p2pRPCHandlerControlLoop()
// Start the gossipsub controller.
go s.gossipsubController.Start()
}
func (s *Service) writeErrorResponseToStream(responseCode byte, reason string, stream libp2pcore.Stream) {

View File

@@ -2,6 +2,7 @@ package sync
import (
"context"
"fmt"
"sync"
"testing"
"time"
@@ -16,6 +17,7 @@ import (
"github.com/OffchainLabs/prysm/v6/beacon-chain/startup"
state_native "github.com/OffchainLabs/prysm/v6/beacon-chain/state/state-native"
mockSync "github.com/OffchainLabs/prysm/v6/beacon-chain/sync/initial-sync/testing"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
leakybucket "github.com/OffchainLabs/prysm/v6/container/leaky-bucket"
"github.com/OffchainLabs/prysm/v6/crypto/bls"
@@ -67,8 +69,9 @@ func TestSyncHandlers_WaitToSync(t *testing.T) {
chainStarted: abool.New(),
clockWaiter: gs,
}
r.gossipsubController = NewGossipsubController(context.Background(), &r)
topic := "/eth2/%x/beacon_block"
topicFmt := "/eth2/%x/beacon_block"
go r.startDiscoveryAndSubscriptions()
time.Sleep(100 * time.Millisecond)
@@ -82,7 +85,10 @@ func TestSyncHandlers_WaitToSync(t *testing.T) {
msg := util.NewBeaconBlock()
msg.Block.ParentRoot = util.Random32Bytes(t)
msg.Signature = sk.Sign([]byte("data")).Marshal()
p2p.ReceivePubSub(topic, msg)
// Build full topic using current fork digest
nse := params.GetNetworkScheduleEntry(r.cfg.clock.CurrentEpoch())
fullTopic := fmt.Sprintf(topicFmt, nse.ForkDigest) + p2p.Encoding().ProtocolSuffix()
p2p.ReceivePubSub(fullTopic, msg)
// wait for chainstart to be sent
time.Sleep(400 * time.Millisecond)
require.Equal(t, true, r.chainStarted.IsSet(), "Did not receive chain start event.")
@@ -137,6 +143,7 @@ func TestSyncHandlers_WaitTillSynced(t *testing.T) {
clockWaiter: gs,
initialSyncComplete: make(chan struct{}),
}
r.gossipsubController = NewGossipsubController(context.Background(), &r)
r.initCaches()
var vr [32]byte
@@ -169,14 +176,16 @@ func TestSyncHandlers_WaitTillSynced(t *testing.T) {
// Save block into DB so that validateBeaconBlockPubSub() process gets short cut.
util.SaveBlock(t, ctx, r.cfg.beaconDB, msg)
topic := "/eth2/%x/beacon_block"
p2p.ReceivePubSub(topic, msg)
topicFmt := "/eth2/%x/beacon_block"
nse := params.GetNetworkScheduleEntry(r.cfg.clock.CurrentEpoch())
fullTopic := fmt.Sprintf(topicFmt, nse.ForkDigest) + p2p.Encoding().ProtocolSuffix()
p2p.ReceivePubSub(fullTopic, msg)
assert.Equal(t, 0, len(blockChan), "block was received by sync service despite not being fully synced")
close(r.initialSyncComplete)
<-syncCompleteCh
p2p.ReceivePubSub(topic, msg)
p2p.ReceivePubSub(fullTopic, msg)
select {
case <-blockChan:
@@ -206,6 +215,7 @@ func TestSyncService_StopCleanly(t *testing.T) {
clockWaiter: gs,
initialSyncComplete: make(chan struct{}),
}
r.gossipsubController = NewGossipsubController(context.Background(), &r)
markInitSyncComplete(t, &r)
go r.startDiscoveryAndSubscriptions()
@@ -265,6 +275,7 @@ func TestService_Stop_SendsGoodbyeMessages(t *testing.T) {
cancel: cancel,
rateLimiter: newRateLimiter(p1),
}
r.gossipsubController = NewGossipsubController(ctx, r)
// Initialize context map for RPC
ctxMap, err := ContextByteVersionsForValRoot(chain.ValidatorsRoot)
@@ -343,6 +354,7 @@ func TestService_Stop_TimeoutHandling(t *testing.T) {
cancel: cancel,
rateLimiter: newRateLimiter(p1),
}
r.gossipsubController = NewGossipsubController(ctx, r)
// Initialize context map for RPC
ctxMap, err := ContextByteVersionsForValRoot(chain.ValidatorsRoot)
@@ -404,6 +416,7 @@ func TestService_Stop_ConcurrentGoodbyeMessages(t *testing.T) {
cancel: cancel,
rateLimiter: newRateLimiter(p1),
}
r.gossipsubController = NewGossipsubController(ctx, r)
// Initialize context map for RPC
ctxMap, err := ContextByteVersionsForValRoot(chain.ValidatorsRoot)

View File

@@ -38,64 +38,27 @@ const pubsubMessageTimeout = 30 * time.Second
var errInvalidDigest = errors.New("invalid digest")
// wrappedVal represents a gossip validator which also returns an error along with the result.
type wrappedVal func(context.Context, peer.ID, *pubsub.Message) (pubsub.ValidationResult, error)
// subHandler represents handler for a given subscription.
type subHandler func(context.Context, proto.Message) error
// noopHandler is used for subscriptions that do not require anything to be done.
var noopHandler subHandler = func(ctx context.Context, msg proto.Message) error {
return nil
}
// subscribeParameters holds the parameters that are needed to construct a set of subscriptions topics for a given
// set of gossipsub subnets.
type subscribeParameters struct {
topicFormat string
validate wrappedVal
handle subHandler
nse params.NetworkScheduleEntry
// getSubnetsToJoin is a function that returns all subnets the node should join.
getSubnetsToJoin func(currentSlot primitives.Slot) map[uint64]bool
// getSubnetsRequiringPeers is a function that returns all subnets that require peers to be found
// but for which no subscriptions are needed.
getSubnetsRequiringPeers func(currentSlot primitives.Slot) map[uint64]bool
}
// shortTopic is a less verbose version of topic strings used for logging.
func (p subscribeParameters) shortTopic() string {
short := p.topicFormat
fmtLen := len(short)
if fmtLen >= 3 && short[fmtLen-3:] == "_%d" {
short = short[:fmtLen-3]
}
return fmt.Sprintf(short, p.nse.ForkDigest)
}
func (p subscribeParameters) logFields() logrus.Fields {
func familyLogFields(tf GossipsubTopicFamilyWithDynamicSubnets) logrus.Fields {
nse := tf.NetworkScheduleEntry()
return logrus.Fields{
"topic": p.shortTopic(),
"topicFamily": fmt.Sprintf("%T", tf),
"digest": nse.ForkDigest,
"forkEpoch": nse.Epoch,
}
}
// fullTopic is the fully qualified topic string, given to gossipsub.
func (p subscribeParameters) fullTopic(subnet uint64, suffix string) string {
return fmt.Sprintf(p.topicFormat, p.nse.ForkDigest, subnet) + suffix
}
// subnetTracker keeps track of which subnets we are subscribed to, out of the set of
// possible subnets described by a `subscribeParameters`.
// subnetTracker keeps track of which subnets we are subscribed to for a given
// dynamic topic family (attestations, sync-committee, data-column, etc.).
type subnetTracker struct {
subscribeParameters
family GossipsubTopicFamilyWithDynamicSubnets
mu sync.RWMutex
subscriptions map[uint64]*pubsub.Subscription
}
func newSubnetTracker(p subscribeParameters) *subnetTracker {
func newSubnetTracker(tf GossipsubTopicFamilyWithDynamicSubnets) *subnetTracker {
return &subnetTracker{
subscribeParameters: p,
subscriptions: make(map[uint64]*pubsub.Subscription),
family: tf,
subscriptions: make(map[uint64]*pubsub.Subscription),
}
}
@@ -192,184 +155,12 @@ func (s *Service) activeSyncSubnetIndices(currentSlot primitives.Slot) map[uint6
return mapFromSlice(subscriptions)
}
// spawn allows the Service to use a custom function for launching goroutines.
// This is useful in tests where we can set spawner to a sync.WaitGroup and
// wait for the spawned goroutines to finish.
func (s *Service) spawn(f func()) {
if s.subscriptionSpawner != nil {
s.subscriptionSpawner(f)
} else {
go f()
}
}
// Register PubSub subscribers
func (s *Service) registerSubscribers(nse params.NetworkScheduleEntry) bool {
// If we have already registered for this fork digest, exit early.
if s.digestActionDone(nse.ForkDigest, registerGossipOnce) {
return false
}
s.spawn(func() {
s.subscribe(p2p.BlockSubnetTopicFormat, s.validateBeaconBlockPubSub, s.beaconBlockSubscriber, nse)
})
s.spawn(func() {
s.subscribe(p2p.AggregateAndProofSubnetTopicFormat, s.validateAggregateAndProof, s.beaconAggregateProofSubscriber, nse)
})
s.spawn(func() {
s.subscribe(p2p.ExitSubnetTopicFormat, s.validateVoluntaryExit, s.voluntaryExitSubscriber, nse)
})
s.spawn(func() {
s.subscribe(p2p.ProposerSlashingSubnetTopicFormat, s.validateProposerSlashing, s.proposerSlashingSubscriber, nse)
})
s.spawn(func() {
s.subscribe(p2p.AttesterSlashingSubnetTopicFormat, s.validateAttesterSlashing, s.attesterSlashingSubscriber, nse)
})
s.spawn(func() {
s.subscribeWithParameters(subscribeParameters{
topicFormat: p2p.AttestationSubnetTopicFormat,
validate: s.validateCommitteeIndexBeaconAttestation,
handle: s.committeeIndexBeaconAttestationSubscriber,
getSubnetsToJoin: s.persistentAndAggregatorSubnetIndices,
getSubnetsRequiringPeers: attesterSubnetIndices,
nse: nse,
})
})
// New gossip topic in Altair
if params.BeaconConfig().AltairForkEpoch <= nse.Epoch {
s.spawn(func() {
s.subscribe(
p2p.SyncContributionAndProofSubnetTopicFormat,
s.validateSyncContributionAndProof,
s.syncContributionAndProofSubscriber,
nse,
)
})
s.spawn(func() {
s.subscribeWithParameters(subscribeParameters{
topicFormat: p2p.SyncCommitteeSubnetTopicFormat,
validate: s.validateSyncCommitteeMessage,
handle: s.syncCommitteeMessageSubscriber,
getSubnetsToJoin: s.activeSyncSubnetIndices,
nse: nse,
})
})
if features.Get().EnableLightClient {
s.spawn(func() {
s.subscribe(
p2p.LightClientOptimisticUpdateTopicFormat,
s.validateLightClientOptimisticUpdate,
noopHandler,
nse,
)
})
s.spawn(func() {
s.subscribe(
p2p.LightClientFinalityUpdateTopicFormat,
s.validateLightClientFinalityUpdate,
noopHandler,
nse,
)
})
}
}
// New gossip topic in Capella
if params.BeaconConfig().CapellaForkEpoch <= nse.Epoch {
s.spawn(func() {
s.subscribe(
p2p.BlsToExecutionChangeSubnetTopicFormat,
s.validateBlsToExecutionChange,
s.blsToExecutionChangeSubscriber,
nse,
)
})
}
// New gossip topic in Deneb, removed in Electra
if params.BeaconConfig().DenebForkEpoch <= nse.Epoch && nse.Epoch < params.BeaconConfig().ElectraForkEpoch {
s.spawn(func() {
s.subscribeWithParameters(subscribeParameters{
topicFormat: p2p.BlobSubnetTopicFormat,
validate: s.validateBlob,
handle: s.blobSubscriber,
nse: nse,
getSubnetsToJoin: func(primitives.Slot) map[uint64]bool {
return mapFromCount(params.BeaconConfig().BlobsidecarSubnetCount)
},
})
})
}
// New gossip topic in Electra, removed in Fulu
if params.BeaconConfig().ElectraForkEpoch <= nse.Epoch && nse.Epoch < params.BeaconConfig().FuluForkEpoch {
s.spawn(func() {
s.subscribeWithParameters(subscribeParameters{
topicFormat: p2p.BlobSubnetTopicFormat,
validate: s.validateBlob,
handle: s.blobSubscriber,
nse: nse,
getSubnetsToJoin: func(currentSlot primitives.Slot) map[uint64]bool {
return mapFromCount(params.BeaconConfig().BlobsidecarSubnetCountElectra)
},
})
})
}
// New gossip topic in Fulu.
if params.BeaconConfig().FuluForkEpoch <= nse.Epoch {
s.spawn(func() {
s.subscribeWithParameters(subscribeParameters{
topicFormat: p2p.DataColumnSubnetTopicFormat,
validate: s.validateDataColumn,
handle: s.dataColumnSubscriber,
nse: nse,
getSubnetsToJoin: s.dataColumnSubnetIndices,
getSubnetsRequiringPeers: s.allDataColumnSubnets,
})
})
}
return true
}
func (s *Service) subscriptionRequestExpired(nse params.NetworkScheduleEntry) bool {
next := params.NextNetworkScheduleEntry(nse.Epoch)
return next.Epoch != nse.Epoch && s.cfg.clock.CurrentEpoch() > next.Epoch
}
func (s *Service) subscribeLogFields(topic string, nse params.NetworkScheduleEntry) logrus.Fields {
return logrus.Fields{
"topic": topic,
"digest": nse.ForkDigest,
"forkEpoch": nse.Epoch,
"currentEpoch": s.cfg.clock.CurrentEpoch(),
}
}
// subscribe to a given topic with a given validator and subscription handler.
// The base protobuf message is used to initialize new messages for decoding.
func (s *Service) subscribe(topic string, validator wrappedVal, handle subHandler, nse params.NetworkScheduleEntry) {
if err := s.waitForInitialSync(s.ctx); err != nil {
log.WithFields(s.subscribeLogFields(topic, nse)).WithError(err).Debug("Context cancelled while waiting for initial sync, not subscribing to topic")
return
}
// Check if this subscribe request is still valid - we may have crossed another fork epoch while waiting for initial sync.
if s.subscriptionRequestExpired(nse) {
// If we are already past the next fork epoch, do not subscribe to this topic.
log.WithFields(s.subscribeLogFields(topic, nse)).Debug("Not subscribing to topic as we are already past the next fork epoch")
return
}
base := p2p.GossipTopicMappings(topic, nse.Epoch)
if base == nil {
// Impossible condition as it would mean topic does not exist.
panic(fmt.Sprintf("%s is not mapped to any message in GossipTopicMappings", topic)) // lint:nopanic -- Impossible condition.
}
s.subscribeWithBase(s.addDigestToTopic(topic, nse.ForkDigest), validator, handle)
}
func (s *Service) subscribeWithBase(topic string, validator wrappedVal, handle subHandler) *pubsub.Subscription {
topic += s.cfg.p2p.Encoding().ProtocolSuffix()
func (s *Service) subscribe(topic string, validator wrappedVal, handle subHandler) *pubsub.Subscription {
log := log.WithField("topic", topic)
// Do not resubscribe already seen subscriptions.
@@ -532,36 +323,38 @@ func (s *Service) wrapAndReportValidation(topic string, v wrappedVal) (string, p
func (s *Service) pruneNotWanted(t *subnetTracker, wantedSubnets map[uint64]bool) {
for _, subnet := range t.unwanted(wantedSubnets) {
t.cancelSubscription(subnet)
s.unSubscribeFromTopic(t.fullTopic(subnet, s.cfg.p2p.Encoding().ProtocolSuffix()))
s.unSubscribeFromTopic(t.family.GetFullTopicString(subnet))
}
}
// subscribeWithParameters subscribes to a list of subnets.
func (s *Service) subscribeWithParameters(p subscribeParameters) {
// subscribeToDynamicSubnetFamily subscribes to a list of subnets.
func (s *Service) subscribeToDynamicSubnetFamily(tf GossipsubTopicFamilyWithDynamicSubnets) *subnetTracker {
tracker := newSubnetTracker(tf)
go s.subscribeToSubnets(tf, tracker)
return tracker
}
func (s *Service) subscribeToSubnets(tf GossipsubTopicFamilyWithDynamicSubnets, tracker *subnetTracker) {
ctx, cancel := context.WithCancel(s.ctx)
defer cancel()
tracker := newSubnetTracker(p)
go s.ensurePeers(ctx, tracker)
go s.logMinimumPeersPerSubnet(ctx, p)
go s.logMinimumPeersPerSubnet(ctx, tf)
if err := s.waitForInitialSync(ctx); err != nil {
log.WithFields(p.logFields()).WithError(err).Debug("Could not subscribe to subnets as initial sync failed")
return
}
s.trySubscribeSubnets(tracker)
slotTicker := slots.NewSlotTicker(s.cfg.clock.GenesisTime(), params.BeaconConfig().SecondsPerSlot)
defer slotTicker.Done()
for {
select {
case <-slotTicker.C():
// Check if this subscribe request is still valid - we may have crossed another fork epoch while waiting for initial sync.
if s.subscriptionRequestExpired(p.nse) {
if s.subscriptionRequestExpired(tf.NetworkScheduleEntry()) {
// If we are already past the next fork epoch, do not subscribe to this topic.
log.WithFields(logrus.Fields{
"topic": p.shortTopic(),
"digest": p.nse.ForkDigest,
"epoch": p.nse.Epoch,
"topicFamily": fmt.Sprintf("%T", tf),
"digest": tf.NetworkScheduleEntry().ForkDigest,
"epoch": tf.NetworkScheduleEntry().Epoch,
"currentEpoch": s.cfg.clock.CurrentEpoch(),
}).Debug("Exiting topic subnet subscription loop")
return
@@ -576,12 +369,11 @@ func (s *Service) subscribeWithParameters(p subscribeParameters) {
// trySubscribeSubnets attempts to subscribe to any missing subnets that we should be subscribed to.
// Only if initial sync is complete.
func (s *Service) trySubscribeSubnets(t *subnetTracker) {
subnetsToJoin := t.getSubnetsToJoin(s.cfg.clock.CurrentSlot())
subnetsToJoin := t.family.GetSubnetsToJoin(s.cfg.clock.CurrentSlot())
s.pruneNotWanted(t, subnetsToJoin)
for _, subnet := range t.missing(subnetsToJoin) {
// TODO: subscribeWithBase appends the protocol suffix, other methods don't. Make this consistent.
topic := t.fullTopic(subnet, "")
t.track(subnet, s.subscribeWithBase(topic, t.validate, t.handle))
topic := t.family.GetFullTopicString(subnet)
t.track(subnet, s.subscribe(topic, t.family.Validator(), t.family.Handler()))
}
}
@@ -604,17 +396,18 @@ func (s *Service) ensurePeers(ctx context.Context, tracker *subnetTracker) {
func (s *Service) tryEnsurePeers(ctx context.Context, tracker *subnetTracker) {
timeout := (time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Second) - 100*time.Millisecond
minPeers := flags.Get().MinimumPeersPerSubnet
neededSubnets := computeAllNeededSubnets(s.cfg.clock.CurrentSlot(), tracker.getSubnetsToJoin, tracker.getSubnetsRequiringPeers)
neededSubnets := computeAllNeededSubnets(s.cfg.clock.CurrentSlot(), tracker.family)
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
err := s.cfg.p2p.FindAndDialPeersWithSubnets(ctx, tracker.topicFormat, tracker.nse.ForkDigest, minPeers, neededSubnets)
builder := func(idx uint64) string { return tracker.family.GetFullTopicString(idx) }
err := s.cfg.p2p.FindAndDialPeersWithSubnets(ctx, builder, minPeers, neededSubnets)
if err != nil && !errors.Is(err, context.DeadlineExceeded) {
log.WithFields(tracker.logFields()).WithError(err).Debug("Could not find peers with subnets")
log.WithFields(familyLogFields(tracker.family)).WithError(err).Debug("Could not find peers with subnets")
}
}
func (s *Service) logMinimumPeersPerSubnet(ctx context.Context, p subscribeParameters) {
logFields := p.logFields()
func (s *Service) logMinimumPeersPerSubnet(ctx context.Context, tf GossipsubTopicFamilyWithDynamicSubnets) {
logFields := familyLogFields(tf)
minimumPeersPerSubnet := flags.Get().MinimumPeersPerSubnet
// Warn the user if we are not subscribed to enough peers in the subnets.
log := log.WithField("minimum", minimumPeersPerSubnet)
@@ -625,12 +418,12 @@ func (s *Service) logMinimumPeersPerSubnet(ctx context.Context, p subscribeParam
select {
case <-logTicker.C:
currentSlot := s.cfg.clock.CurrentSlot()
subnetsToFindPeersIndex := computeAllNeededSubnets(currentSlot, p.getSubnetsToJoin, p.getSubnetsRequiringPeers)
subnetsToFindPeersIndex := computeAllNeededSubnets(currentSlot, tf)
isSubnetWithMissingPeers := false
// Find new peers for wanted subnets if needed.
for index := range subnetsToFindPeersIndex {
topic := fmt.Sprintf(p.topicFormat, p.nse.ForkDigest, index)
topic := tf.GetFullTopicString(index)
// Check if we have enough peers in the subnet. Skip if we do.
if count := s.connectedPeersCount(topic); count < minimumPeersPerSubnet {
@@ -666,9 +459,8 @@ func (s *Service) unSubscribeFromTopic(topic string) {
}
// connectedPeersCount counts how many peer for a given topic are connected to the node.
func (s *Service) connectedPeersCount(subnetTopic string) int {
topic := subnetTopic + s.cfg.p2p.Encoding().ProtocolSuffix()
peersWithSubnet := s.cfg.p2p.PubSub().ListPeers(topic)
func (s *Service) connectedPeersCount(fullTopic string) int {
peersWithSubnet := s.cfg.p2p.PubSub().ListPeers(fullTopic)
return len(peersWithSubnet)
}
@@ -818,17 +610,13 @@ func isDigestValid(digest [4]byte, clock *startup.Clock) (bool, error) {
// and the subnets for which we want to find peers.
func computeAllNeededSubnets(
currentSlot primitives.Slot,
getSubnetsToJoin func(currentSlot primitives.Slot) map[uint64]bool,
getSubnetsRequiringPeers func(currentSlot primitives.Slot) map[uint64]bool,
dtf GossipsubTopicFamilyWithDynamicSubnets,
) map[uint64]bool {
// Retrieve the subnets we want to join.
subnetsToJoin := getSubnetsToJoin(currentSlot)
subnetsToJoin := dtf.GetSubnetsToJoin(currentSlot)
// Retrieve the subnets we want to find peers into.
subnetsRequiringPeers := make(map[uint64]bool)
if getSubnetsRequiringPeers != nil {
subnetsRequiringPeers = getSubnetsRequiringPeers(currentSlot)
}
subnetsRequiringPeers := dtf.GetSubnetsForBroadcast(currentSlot)
// Combine the two maps to get all needed subnets.
neededSubnets := make(map[uint64]bool, len(subnetsToJoin)+len(subnetsRequiringPeers))

View File

@@ -30,6 +30,7 @@ import (
"github.com/OffchainLabs/prysm/v6/testing/require"
"github.com/OffchainLabs/prysm/v6/testing/util"
"github.com/OffchainLabs/prysm/v6/time/slots"
"github.com/ethereum/go-ethereum/p2p/enode"
pubsub "github.com/libp2p/go-libp2p-pubsub"
pubsubpb "github.com/libp2p/go-libp2p-pubsub/pb"
"github.com/libp2p/go-libp2p/core/peer"
@@ -37,6 +38,115 @@ import (
"google.golang.org/protobuf/proto"
)
// testStaticFamily implements a minimal static topic family for tests.
type testStaticFamily struct {
nse params.NetworkScheduleEntry
topicFmt string
protocolSuffix string
validator wrappedVal
handler subHandler
}
func (t testStaticFamily) Validator() wrappedVal {
return t.validator
}
func (t testStaticFamily) Handler() subHandler {
return t.handler
}
func (t testStaticFamily) NetworkScheduleEntry() params.NetworkScheduleEntry {
return t.nse
}
func (t testStaticFamily) GetFullTopicString() string {
return fmt.Sprintf(t.topicFmt, t.nse.ForkDigest) + t.protocolSuffix
}
func (t testStaticFamily) Subscribe() {}
func (t testStaticFamily) Unsubscribe() {}
func makeTestFamily(nse params.NetworkScheduleEntry, topicFmt, suffix string, validator wrappedVal, handler subHandler) testStaticFamily {
return testStaticFamily{
nse: nse,
topicFmt: topicFmt,
protocolSuffix: suffix,
validator: validator,
handler: handler,
}
}
func makeFullTopic(topicFmt string, nse params.NetworkScheduleEntry, suffix string) string {
return fmt.Sprintf(topicFmt, nse.ForkDigest) + suffix
}
// testDynamicFamily implements a minimal dynamic topic family for tests.
type testDynamicFamily struct {
nse params.NetworkScheduleEntry
topicFmt string
protocolSuffix string
validator wrappedVal
handler subHandler
subnetsToJoin func(primitives.Slot) map[uint64]bool
subnetsForCast func(primitives.Slot) map[uint64]bool
}
func (t *testDynamicFamily) Name() string {
return "TestDynamicFamily"
}
func (t *testDynamicFamily) Validator() wrappedVal {
return t.validator
}
func (t *testDynamicFamily) Handler() subHandler {
return t.handler
}
func (t *testDynamicFamily) NetworkScheduleEntry() params.NetworkScheduleEntry {
return t.nse
}
func (t *testDynamicFamily) GetFullTopicString(subnet uint64) string {
return fmt.Sprintf(t.topicFmt, t.nse.ForkDigest, subnet) + t.protocolSuffix
}
func (t *testDynamicFamily) GetSubnetsToJoin(slot primitives.Slot) map[uint64]bool {
if t.subnetsToJoin != nil {
return t.subnetsToJoin(slot)
}
return nil
}
func (t *testDynamicFamily) GetSubnetsForBroadcast(slot primitives.Slot) map[uint64]bool {
if t.subnetsForCast != nil {
return t.subnetsForCast(slot)
}
return nil
}
func (t *testDynamicFamily) Subscribe() {}
func (t *testDynamicFamily) Unsubscribe() {}
func (t *testDynamicFamily) GetTopicsForNode(_ *enode.Node) ([]string, error) {
return nil, nil
}
func makeTestDynamicFamily(nse params.NetworkScheduleEntry, topicFmt, suffix string, validator wrappedVal, handler subHandler,
getJoin func(primitives.Slot) map[uint64]bool, getCast func(primitives.Slot) map[uint64]bool) *testDynamicFamily {
return &testDynamicFamily{
nse: nse,
topicFmt: topicFmt,
protocolSuffix: suffix,
validator: validator,
handler: handler,
subnetsToJoin: getJoin,
subnetsForCast: getCast,
}
}
func TestSubscribe_ReceivesValidMessage(t *testing.T) {
p2pService := p2ptest.NewTestP2P(t)
gt := time.Now()
@@ -64,7 +174,7 @@ func TestSubscribe_ReceivesValidMessage(t *testing.T) {
var wg sync.WaitGroup
wg.Add(1)
r.subscribe(topic, r.noopValidator, func(_ context.Context, msg proto.Message) error {
tf := makeTestFamily(nse, topic, r.cfg.p2p.Encoding().ProtocolSuffix(), r.noopValidator, func(_ context.Context, msg proto.Message) error {
m, ok := msg.(*pb.SignedVoluntaryExit)
assert.Equal(t, true, ok, "Object is not of type *pb.SignedVoluntaryExit")
if m.Exit == nil || m.Exit.Epoch != 55 {
@@ -72,10 +182,10 @@ func TestSubscribe_ReceivesValidMessage(t *testing.T) {
}
wg.Done()
return nil
}, nse)
})
r.subscribe(tf.GetFullTopicString(), tf.Validator(), tf.Handler())
r.markForChainStart()
p2pService.ReceivePubSub(topic, &pb.SignedVoluntaryExit{Exit: &pb.VoluntaryExit{Epoch: 55}, Signature: make([]byte, fieldparams.BLSSignatureLength)})
p2pService.ReceivePubSub(tf.GetFullTopicString(), &pb.SignedVoluntaryExit{Exit: &pb.VoluntaryExit{Epoch: 55}, Signature: make([]byte, fieldparams.BLSSignatureLength)})
if util.WaitTimeout(&wg, time.Second) {
t.Fatal("Did not receive PubSub in 1 second")
@@ -110,12 +220,10 @@ func TestSubscribe_UnsubscribeTopic(t *testing.T) {
p2pService.Digest = nse.ForkDigest
topic := "/eth2/%x/voluntary_exit"
r.subscribe(topic, r.noopValidator, func(_ context.Context, msg proto.Message) error {
return nil
}, nse)
tf := makeTestFamily(nse, topic, r.cfg.p2p.Encoding().ProtocolSuffix(), r.noopValidator, func(_ context.Context, msg proto.Message) error { return nil })
r.subscribe(tf.GetFullTopicString(), tf.Validator(), tf.Handler())
r.markForChainStart()
fullTopic := fmt.Sprintf(topic, p2pService.Digest) + p2pService.Encoding().ProtocolSuffix()
fullTopic := tf.GetFullTopicString()
assert.Equal(t, true, r.subHandler.topicExists(fullTopic))
topics := p2pService.PubSub().GetTopics()
assert.Equal(t, fullTopic, topics[0])
@@ -162,11 +270,12 @@ func TestSubscribe_ReceivesAttesterSlashing(t *testing.T) {
wg.Add(1)
nse := params.GetNetworkScheduleEntry(r.cfg.clock.CurrentEpoch())
p2pService.Digest = nse.ForkDigest
r.subscribe(topic, r.noopValidator, func(ctx context.Context, msg proto.Message) error {
tf := makeTestFamily(nse, topic, r.cfg.p2p.Encoding().ProtocolSuffix(), r.noopValidator, func(ctx context.Context, msg proto.Message) error {
require.NoError(t, r.attesterSlashingSubscriber(ctx, msg))
wg.Done()
return nil
}, nse)
})
r.subscribe(tf.GetFullTopicString(), tf.Validator(), tf.Handler())
beaconState, privKeys := util.DeterministicGenesisState(t, 64)
chainService.State = beaconState
r.markForChainStart()
@@ -178,7 +287,7 @@ func TestSubscribe_ReceivesAttesterSlashing(t *testing.T) {
require.NoError(t, err, "Error generating attester slashing")
err = r.cfg.beaconDB.SaveState(ctx, beaconState, bytesutil.ToBytes32(attesterSlashing.FirstAttestation().GetData().BeaconBlockRoot))
require.NoError(t, err)
p2pService.ReceivePubSub(topic, attesterSlashing)
p2pService.ReceivePubSub(tf.GetFullTopicString(), attesterSlashing)
if util.WaitTimeout(&wg, time.Second) {
t.Fatal("Did not receive PubSub in 1 second")
@@ -217,11 +326,12 @@ func TestSubscribe_ReceivesProposerSlashing(t *testing.T) {
params.OverrideBeaconConfig(params.MainnetConfig())
nse := params.GetNetworkScheduleEntry(r.cfg.clock.CurrentEpoch())
p2pService.Digest = nse.ForkDigest
r.subscribe(topic, r.noopValidator, func(ctx context.Context, msg proto.Message) error {
tf := makeTestFamily(nse, topic, r.cfg.p2p.Encoding().ProtocolSuffix(), r.noopValidator, func(ctx context.Context, msg proto.Message) error {
require.NoError(t, r.proposerSlashingSubscriber(ctx, msg))
wg.Done()
return nil
}, nse)
})
r.subscribe(tf.GetFullTopicString(), tf.Validator(), tf.Handler())
beaconState, privKeys := util.DeterministicGenesisState(t, 64)
chainService.State = beaconState
r.markForChainStart()
@@ -232,7 +342,7 @@ func TestSubscribe_ReceivesProposerSlashing(t *testing.T) {
)
require.NoError(t, err, "Error generating proposer slashing")
p2pService.ReceivePubSub(topic, proposerSlashing)
p2pService.ReceivePubSub(tf.GetFullTopicString(), proposerSlashing)
if util.WaitTimeout(&wg, time.Second) {
t.Fatal("Did not receive PubSub in 1 second")
@@ -266,12 +376,13 @@ func TestSubscribe_HandlesPanic(t *testing.T) {
var wg sync.WaitGroup
wg.Add(1)
r.subscribe(topic, r.noopValidator, func(_ context.Context, msg proto.Message) error {
tf := makeTestFamily(nse, topic, p.Encoding().ProtocolSuffix(), r.noopValidator, func(_ context.Context, msg proto.Message) error {
defer wg.Done()
panic("bad")
}, nse)
})
r.subscribe(tf.GetFullTopicString(), tf.Validator(), tf.Handler())
r.markForChainStart()
p.ReceivePubSub(topic, &pb.SignedVoluntaryExit{Exit: &pb.VoluntaryExit{Epoch: 55}, Signature: make([]byte, fieldparams.BLSSignatureLength)})
p.ReceivePubSub(tf.GetFullTopicString(), &pb.SignedVoluntaryExit{Exit: &pb.VoluntaryExit{Epoch: 55}, Signature: make([]byte, fieldparams.BLSSignatureLength)})
if util.WaitTimeout(&wg, time.Second) {
t.Fatal("Did not receive PubSub in 1 second")
@@ -297,15 +408,12 @@ func TestRevalidateSubscription_CorrectlyFormatsTopic(t *testing.T) {
}
nse := params.GetNetworkScheduleEntry(r.cfg.clock.CurrentEpoch())
params := subscribeParameters{
topicFormat: "/eth2/testing/%#x/committee%d",
nse: nse,
}
tracker := newSubnetTracker(params)
tfDyn := makeTestDynamicFamily(nse, "/eth2/testing/%#x/committee%d", r.cfg.p2p.Encoding().ProtocolSuffix(), r.noopValidator, nil, nil, nil)
tracker := newSubnetTracker(tfDyn)
// committee index 1
c1 := uint64(1)
fullTopic := params.fullTopic(c1, r.cfg.p2p.Encoding().ProtocolSuffix())
fullTopic := tfDyn.GetFullTopicString(c1)
_, topVal := r.wrapAndReportValidation(fullTopic, r.noopValidator)
require.NoError(t, r.cfg.p2p.PubSub().RegisterTopicValidator(fullTopic, topVal))
sub1, err := r.cfg.p2p.SubscribeToTopic(fullTopic)
@@ -314,7 +422,7 @@ func TestRevalidateSubscription_CorrectlyFormatsTopic(t *testing.T) {
// committee index 2
c2 := uint64(2)
fullTopic = params.fullTopic(c2, r.cfg.p2p.Encoding().ProtocolSuffix())
fullTopic = tfDyn.GetFullTopicString(c2)
_, topVal = r.wrapAndReportValidation(fullTopic, r.noopValidator)
err = r.cfg.p2p.PubSub().RegisterTopicValidator(fullTopic, topVal)
require.NoError(t, err)
@@ -479,6 +587,7 @@ func TestFilterSubnetPeers(t *testing.T) {
chainStarted: abool.New(),
subHandler: newSubTopicHandler(),
}
r.gossipsubController = NewGossipsubController(ctx, &r)
markInitSyncComplete(t, &r)
// Empty cache at the end of the test.
defer cache.SubnetIDs.EmptyAllCaches()
@@ -552,11 +661,8 @@ func TestSubscribeWithSyncSubnets_DynamicOK(t *testing.T) {
currEpoch := slots.ToEpoch(slot)
cache.SyncSubnetIDs.AddSyncCommitteeSubnets([]byte("pubkey"), currEpoch, []uint64{0, 1}, 10*time.Second)
nse := params.GetNetworkScheduleEntry(r.cfg.clock.CurrentEpoch())
go r.subscribeWithParameters(subscribeParameters{
topicFormat: p2p.SyncCommitteeSubnetTopicFormat,
nse: nse,
getSubnetsToJoin: r.activeSyncSubnetIndices,
})
tfDyn := makeTestDynamicFamily(nse, p2p.SyncCommitteeSubnetTopicFormat, r.cfg.p2p.Encoding().ProtocolSuffix(), r.noopValidator, nil, r.activeSyncSubnetIndices, nil)
go r.subscribeToDynamicSubnetFamily(tfDyn)
time.Sleep(2 * time.Second)
assert.Equal(t, 2, len(r.cfg.p2p.PubSub().GetTopics()))
topicMap := map[string]bool{}
@@ -601,11 +707,8 @@ func TestSubscribeWithSyncSubnets_DynamicSwitchFork(t *testing.T) {
require.Equal(t, [4]byte(params.BeaconConfig().DenebForkVersion), nse.ForkVersion)
require.Equal(t, params.BeaconConfig().DenebForkEpoch, nse.Epoch)
sp := newSubnetTracker(subscribeParameters{
topicFormat: p2p.SyncCommitteeSubnetTopicFormat,
nse: nse,
getSubnetsToJoin: r.activeSyncSubnetIndices,
})
tfDyn2 := makeTestDynamicFamily(nse, p2p.SyncCommitteeSubnetTopicFormat, r.cfg.p2p.Encoding().ProtocolSuffix(), r.noopValidator, nil, r.activeSyncSubnetIndices, nil)
sp := newSubnetTracker(tfDyn2)
r.trySubscribeSubnets(sp)
assert.Equal(t, 2, len(r.cfg.p2p.PubSub().GetTopics()))
topicMap := map[string]bool{}
@@ -625,7 +728,7 @@ func TestSubscribeWithSyncSubnets_DynamicSwitchFork(t *testing.T) {
require.Equal(t, [4]byte(params.BeaconConfig().ElectraForkVersion), nse.ForkVersion)
require.Equal(t, params.BeaconConfig().ElectraForkEpoch, nse.Epoch)
sp.nse = nse
tfDyn2.nse = nse
// clear the cache and re-subscribe to subnets.
// this should result in the subscriptions being removed
cache.SyncSubnetIDs.EmptyAllCaches()

View File

@@ -0,0 +1,261 @@
package sync
import (
"fmt"
"sync"
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
// AttestationTopicFamily
var _ GossipsubTopicFamilyWithDynamicSubnets = (*AttestationTopicFamily)(nil)
type baseGossipsubTopicFamilyWithDynamicSubnets struct {
baseGossipsubTopicFamily
mu sync.Mutex
tracker *subnetTracker
unsubscribed bool
}
func (b *baseGossipsubTopicFamilyWithDynamicSubnets) Subscribe(tf GossipsubTopicFamilyWithDynamicSubnets) {
b.mu.Lock()
defer b.mu.Unlock()
if b.unsubscribed {
log.WithFields(logrus.Fields{
"topicFamily": fmt.Sprintf("%T", tf),
"digest": b.nse.ForkDigest,
"epoch": b.nse.Epoch,
}).Error("Cannot subscribe after unsubscribing")
return
}
b.tracker = b.syncService.subscribeToDynamicSubnetFamily(tf)
}
func (b *baseGossipsubTopicFamilyWithDynamicSubnets) Unsubscribe() {
b.mu.Lock()
defer b.mu.Unlock()
b.unsubscribed = true
b.syncService.pruneNotWanted(b.tracker, nil) // unsubscribe from all subnets
}
type AttestationTopicFamily struct {
baseGossipsubTopicFamilyWithDynamicSubnets
}
// NewAttestationTopicFamily creates a new AttestationTopicFamily.
func NewAttestationTopicFamily(s *Service, nse params.NetworkScheduleEntry) *AttestationTopicFamily {
attestationTopicFamily := &AttestationTopicFamily{
baseGossipsubTopicFamilyWithDynamicSubnets: baseGossipsubTopicFamilyWithDynamicSubnets{
baseGossipsubTopicFamily: baseGossipsubTopicFamily{
syncService: s,
nse: nse,
protocolSuffix: s.cfg.p2p.Encoding().ProtocolSuffix(),
},
},
}
return attestationTopicFamily
}
func (a *AttestationTopicFamily) Name() string {
return "AttestationTopicFamily"
}
// Validator returns the validator function for attestation subnets.
func (a *AttestationTopicFamily) Validator() wrappedVal {
return a.syncService.validateCommitteeIndexBeaconAttestation
}
// Handler returns the message handler for attestation subnets.
func (a *AttestationTopicFamily) Handler() subHandler {
return a.syncService.committeeIndexBeaconAttestationSubscriber
}
// GetFullTopicString builds the full topic string for an attestation subnet.
func (a *AttestationTopicFamily) GetFullTopicString(subnet uint64) string {
return fmt.Sprintf(p2p.AttestationSubnetTopicFormat, a.nse.ForkDigest, subnet) + a.protocolSuffix
}
// GetSubnetsToJoin returns persistent and aggregator subnets.
func (a *AttestationTopicFamily) GetSubnetsToJoin(slot primitives.Slot) map[uint64]bool {
return a.syncService.persistentAndAggregatorSubnetIndices(slot)
}
// GetSubnetsForBroadcast returns subnets needed for attestation duties.
func (a *AttestationTopicFamily) GetSubnetsForBroadcast(slot primitives.Slot) map[uint64]bool {
return attesterSubnetIndices(slot)
}
// GetTopicsForNode returns all topics for the given node that are relevant to this topic family.
func (a *AttestationTopicFamily) GetTopicsForNode(node *enode.Node) ([]string, error) {
return getTopicsForNode(a.syncService, a, node, p2p.AttestationSubnets)
}
func (a *AttestationTopicFamily) Subscribe() {
a.baseGossipsubTopicFamilyWithDynamicSubnets.Subscribe(a)
}
func (a *AttestationTopicFamily) Unsubscribe() {
a.baseGossipsubTopicFamilyWithDynamicSubnets.Unsubscribe()
}
// SyncCommitteeTopicFamily
var _ GossipsubTopicFamilyWithDynamicSubnets = (*SyncCommitteeTopicFamily)(nil)
type SyncCommitteeTopicFamily struct {
baseGossipsubTopicFamilyWithDynamicSubnets
}
// NewSyncCommitteeTopicFamily creates a new SyncCommitteeTopicFamily.
func NewSyncCommitteeTopicFamily(s *Service, nse params.NetworkScheduleEntry) *SyncCommitteeTopicFamily {
return &SyncCommitteeTopicFamily{
baseGossipsubTopicFamilyWithDynamicSubnets: baseGossipsubTopicFamilyWithDynamicSubnets{
baseGossipsubTopicFamily: baseGossipsubTopicFamily{
syncService: s,
nse: nse,
protocolSuffix: s.cfg.p2p.Encoding().ProtocolSuffix(),
},
},
}
}
func (s *SyncCommitteeTopicFamily) Name() string {
return "SyncCommitteeTopicFamily"
}
// Validator returns the validator function for sync committee subnets.
func (s *SyncCommitteeTopicFamily) Validator() wrappedVal {
return s.syncService.validateSyncCommitteeMessage
}
// Handler returns the message handler for sync committee subnets.
func (s *SyncCommitteeTopicFamily) Handler() subHandler {
return s.syncService.syncCommitteeMessageSubscriber
}
// GetFullTopicString builds the full topic string for a sync committee subnet.
func (s *SyncCommitteeTopicFamily) GetFullTopicString(subnet uint64) string {
return fmt.Sprintf(p2p.SyncCommitteeSubnetTopicFormat, s.nse.ForkDigest, subnet) + s.protocolSuffix
}
// GetSubnetsToJoin returns active sync committee subnets.
func (s *SyncCommitteeTopicFamily) GetSubnetsToJoin(slot primitives.Slot) map[uint64]bool {
return s.syncService.activeSyncSubnetIndices(slot)
}
// GetSubnetsForBroadcast returns nil as there are no separate peer requirements.
func (s *SyncCommitteeTopicFamily) GetSubnetsForBroadcast(slot primitives.Slot) map[uint64]bool {
return nil
}
// GetTopicsForNode returns all topics for the given node that are relevant to this topic family.
func (s *SyncCommitteeTopicFamily) GetTopicsForNode(node *enode.Node) ([]string, error) {
return getTopicsForNode(s.syncService, s, node, p2p.SyncSubnets)
}
func (s *SyncCommitteeTopicFamily) Subscribe() {
s.baseGossipsubTopicFamilyWithDynamicSubnets.Subscribe(s)
}
func (s *SyncCommitteeTopicFamily) Unsubscribe() {
s.baseGossipsubTopicFamilyWithDynamicSubnets.Unsubscribe()
}
// DataColumnTopicFamily
var _ GossipsubTopicFamilyWithDynamicSubnets = (*DataColumnTopicFamily)(nil)
type DataColumnTopicFamily struct {
baseGossipsubTopicFamilyWithDynamicSubnets
}
// NewDataColumnTopicFamily creates a new DataColumnTopicFamily.
func NewDataColumnTopicFamily(s *Service, nse params.NetworkScheduleEntry) *DataColumnTopicFamily {
return &DataColumnTopicFamily{
baseGossipsubTopicFamilyWithDynamicSubnets: baseGossipsubTopicFamilyWithDynamicSubnets{
baseGossipsubTopicFamily: baseGossipsubTopicFamily{
syncService: s,
nse: nse,
protocolSuffix: s.cfg.p2p.Encoding().ProtocolSuffix(),
},
},
}
}
func (d *DataColumnTopicFamily) Name() string {
return "DataColumnTopicFamily"
}
// Validator returns the validator function for data column subnets.
func (d *DataColumnTopicFamily) Validator() wrappedVal {
return d.syncService.validateDataColumn
}
// Handler returns the message handler for data column subnets.
func (d *DataColumnTopicFamily) Handler() subHandler {
return d.syncService.dataColumnSubscriber
}
// GetFullTopicString builds the full topic string for a data column subnet.
func (d *DataColumnTopicFamily) GetFullTopicString(subnet uint64) string {
return fmt.Sprintf(p2p.DataColumnSubnetTopicFormat, d.nse.ForkDigest, subnet) + d.protocolSuffix
}
// GetSubnetsToJoin returns data column subnets.
func (d *DataColumnTopicFamily) GetSubnetsToJoin(slot primitives.Slot) map[uint64]bool {
return d.syncService.dataColumnSubnetIndices(slot)
}
// GetSubnetsForBroadcast returns all data column subnets.
func (d *DataColumnTopicFamily) GetSubnetsForBroadcast(slot primitives.Slot) map[uint64]bool {
return d.syncService.allDataColumnSubnets(slot)
}
// GetTopicsForNode returns all topics for the given node that are relevant to this topic family.
func (d *DataColumnTopicFamily) GetTopicsForNode(node *enode.Node) ([]string, error) {
return getTopicsForNode(d.syncService, d, node, p2p.DataColumnSubnets)
}
func (d *DataColumnTopicFamily) Subscribe() {
d.baseGossipsubTopicFamilyWithDynamicSubnets.Subscribe(d)
}
func (d *DataColumnTopicFamily) Unsubscribe() {
d.baseGossipsubTopicFamilyWithDynamicSubnets.Unsubscribe()
}
type nodeSubnetExtractor func(id enode.ID, n *enode.Node, r *enr.Record) (map[uint64]bool, error)
func getTopicsForNode(
s *Service,
tf GossipsubTopicFamilyWithDynamicSubnets,
node *enode.Node,
extractor nodeSubnetExtractor,
) ([]string, error) {
if node == nil {
return nil, errors.New("enode is nil")
}
currentSlot := s.cfg.clock.CurrentSlot()
neededSubnets := computeAllNeededSubnets(
currentSlot,
tf,
)
nodeSubnets, err := extractor(node.ID(), node, node.Record())
if err != nil {
return nil, err
}
var topics []string
for subnet := range neededSubnets {
if nodeSubnets[subnet] {
topics = append(topics, tf.GetFullTopicString(subnet))
}
}
return topics, nil
}

View File

@@ -0,0 +1,51 @@
package sync
import (
"fmt"
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p"
"github.com/OffchainLabs/prysm/v6/config/params"
)
var _ GossipsubTopicFamilyWithoutDynamicSubnets = (*BlobTopicFamily)(nil)
// BlobTopicFamily represents a static-subnet family instance for a specific blob subnet index.
type BlobTopicFamily struct {
baseGossipsubTopicFamily
subnetIndex uint64
}
func NewBlobTopicFamily(s *Service, nse params.NetworkScheduleEntry, subnetIndex uint64) *BlobTopicFamily {
return &BlobTopicFamily{
baseGossipsubTopicFamily{
syncService: s,
nse: nse,
protocolSuffix: s.cfg.p2p.Encoding().ProtocolSuffix(),
},
subnetIndex,
}
}
func (b *BlobTopicFamily) Name() string {
return fmt.Sprintf("BlobTopicFamily-%d", b.subnetIndex)
}
func (b *BlobTopicFamily) Validator() wrappedVal {
return b.syncService.validateBlob
}
func (b *BlobTopicFamily) Handler() subHandler {
return b.syncService.blobSubscriber
}
func (b *BlobTopicFamily) GetFullTopicString() string {
return fmt.Sprintf(p2p.BlobSubnetTopicFormat, b.nse.ForkDigest, b.subnetIndex) + b.protocolSuffix
}
func (b *BlobTopicFamily) Subscribe() {
b.syncService.subscribe(b.GetFullTopicString(), b.Validator(), b.Handler())
}
func (b *BlobTopicFamily) Unsubscribe() {
b.syncService.unSubscribeFromTopic(b.GetFullTopicString())
}

View File

@@ -0,0 +1,366 @@
package sync
import (
"fmt"
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p"
"github.com/OffchainLabs/prysm/v6/config/params"
)
// Blocks
var _ GossipsubTopicFamilyWithoutDynamicSubnets = (*BlockTopicFamily)(nil)
type BlockTopicFamily struct {
baseGossipsubTopicFamily
}
func NewBlockTopicFamily(s *Service, nse params.NetworkScheduleEntry) *BlockTopicFamily {
return &BlockTopicFamily{
baseGossipsubTopicFamily{
syncService: s,
nse: nse,
protocolSuffix: s.cfg.p2p.Encoding().ProtocolSuffix()},
}
}
func (b *BlockTopicFamily) Name() string {
return "BlockTopicFamily"
}
func (b *BlockTopicFamily) Validator() wrappedVal {
return b.syncService.validateBeaconBlockPubSub
}
func (b *BlockTopicFamily) Handler() subHandler {
return b.syncService.beaconBlockSubscriber
}
func (b *BlockTopicFamily) GetFullTopicString() string {
return fmt.Sprintf(p2p.BlockSubnetTopicFormat, b.nse.ForkDigest) + b.protocolSuffix
}
func (b *BlockTopicFamily) Subscribe() {
b.syncService.subscribe(b.GetFullTopicString(), b.Validator(), b.Handler())
}
func (b *BlockTopicFamily) Unsubscribe() {
b.syncService.unSubscribeFromTopic(b.GetFullTopicString())
}
// Aggregate and Proof
var _ GossipsubTopicFamilyWithoutDynamicSubnets = (*AggregateAndProofTopicFamily)(nil)
type AggregateAndProofTopicFamily struct {
baseGossipsubTopicFamily
}
func NewAggregateAndProofTopicFamily(s *Service, nse params.NetworkScheduleEntry) *AggregateAndProofTopicFamily {
return &AggregateAndProofTopicFamily{
baseGossipsubTopicFamily{
syncService: s,
nse: nse,
protocolSuffix: s.cfg.p2p.Encoding().ProtocolSuffix()},
}
}
func (a *AggregateAndProofTopicFamily) Name() string {
return "AggregateAndProofTopicFamily"
}
func (a *AggregateAndProofTopicFamily) Validator() wrappedVal {
return a.syncService.validateAggregateAndProof
}
func (a *AggregateAndProofTopicFamily) Handler() subHandler {
return a.syncService.beaconAggregateProofSubscriber
}
func (a *AggregateAndProofTopicFamily) GetFullTopicString() string {
return fmt.Sprintf(p2p.AggregateAndProofSubnetTopicFormat, a.nse.ForkDigest) + a.protocolSuffix
}
func (a *AggregateAndProofTopicFamily) Subscribe() {
a.syncService.subscribe(a.GetFullTopicString(), a.Validator(), a.Handler())
}
func (a *AggregateAndProofTopicFamily) Unsubscribe() {
a.syncService.unSubscribeFromTopic(a.GetFullTopicString())
}
// Voluntary Exit
var _ GossipsubTopicFamilyWithoutDynamicSubnets = (*VoluntaryExitTopicFamily)(nil)
type VoluntaryExitTopicFamily struct {
baseGossipsubTopicFamily
}
func NewVoluntaryExitTopicFamily(s *Service, nse params.NetworkScheduleEntry) *VoluntaryExitTopicFamily {
return &VoluntaryExitTopicFamily{
baseGossipsubTopicFamily{
syncService: s,
nse: nse,
protocolSuffix: s.cfg.p2p.Encoding().ProtocolSuffix()},
}
}
func (v *VoluntaryExitTopicFamily) Name() string {
return "VoluntaryExitTopicFamily"
}
func (v *VoluntaryExitTopicFamily) Validator() wrappedVal {
return v.syncService.validateVoluntaryExit
}
func (v *VoluntaryExitTopicFamily) Handler() subHandler {
return v.syncService.voluntaryExitSubscriber
}
func (v *VoluntaryExitTopicFamily) GetFullTopicString() string {
return fmt.Sprintf(p2p.ExitSubnetTopicFormat, v.nse.ForkDigest) + v.protocolSuffix
}
func (v *VoluntaryExitTopicFamily) Subscribe() {
v.syncService.subscribe(v.GetFullTopicString(), v.Validator(), v.Handler())
}
func (v *VoluntaryExitTopicFamily) Unsubscribe() {
v.syncService.unSubscribeFromTopic(v.GetFullTopicString())
}
// Proposer Slashing
var _ GossipsubTopicFamilyWithoutDynamicSubnets = (*ProposerSlashingTopicFamily)(nil)
type ProposerSlashingTopicFamily struct {
baseGossipsubTopicFamily
}
func NewProposerSlashingTopicFamily(s *Service, nse params.NetworkScheduleEntry) *ProposerSlashingTopicFamily {
return &ProposerSlashingTopicFamily{
baseGossipsubTopicFamily{
syncService: s,
nse: nse,
protocolSuffix: s.cfg.p2p.Encoding().ProtocolSuffix()},
}
}
func (p *ProposerSlashingTopicFamily) Name() string {
return "ProposerSlashingTopicFamily"
}
func (p *ProposerSlashingTopicFamily) Validator() wrappedVal {
return p.syncService.validateProposerSlashing
}
func (p *ProposerSlashingTopicFamily) Handler() subHandler {
return p.syncService.proposerSlashingSubscriber
}
func (p *ProposerSlashingTopicFamily) GetFullTopicString() string {
return fmt.Sprintf(p2p.ProposerSlashingSubnetTopicFormat, p.nse.ForkDigest) + p.protocolSuffix
}
func (p *ProposerSlashingTopicFamily) Subscribe() {
p.syncService.subscribe(p.GetFullTopicString(), p.Validator(), p.Handler())
}
func (p *ProposerSlashingTopicFamily) Unsubscribe() {
p.syncService.unSubscribeFromTopic(p.GetFullTopicString())
}
// Attester Slashing
var _ GossipsubTopicFamilyWithoutDynamicSubnets = (*AttesterSlashingTopicFamily)(nil)
type AttesterSlashingTopicFamily struct {
baseGossipsubTopicFamily
}
func NewAttesterSlashingTopicFamily(s *Service, nse params.NetworkScheduleEntry) *AttesterSlashingTopicFamily {
return &AttesterSlashingTopicFamily{
baseGossipsubTopicFamily{
syncService: s,
nse: nse,
protocolSuffix: s.cfg.p2p.Encoding().ProtocolSuffix()},
}
}
func (a *AttesterSlashingTopicFamily) Name() string {
return "AttesterSlashingTopicFamily"
}
func (a *AttesterSlashingTopicFamily) Validator() wrappedVal {
return a.syncService.validateAttesterSlashing
}
func (a *AttesterSlashingTopicFamily) Handler() subHandler {
return a.syncService.attesterSlashingSubscriber
}
func (a *AttesterSlashingTopicFamily) GetFullTopicString() string {
return fmt.Sprintf(p2p.AttesterSlashingSubnetTopicFormat, a.nse.ForkDigest) + a.protocolSuffix
}
// TODO: Do we really need to spawn go-routines here ?
func (a *AttesterSlashingTopicFamily) Subscribe() {
a.syncService.subscribe(a.GetFullTopicString(), a.Validator(), a.Handler())
}
func (a *AttesterSlashingTopicFamily) Unsubscribe() {
a.syncService.unSubscribeFromTopic(a.GetFullTopicString())
}
// Sync Contribution and Proof (Altair+)
var _ GossipsubTopicFamilyWithoutDynamicSubnets = (*SyncContributionAndProofTopicFamily)(nil)
type SyncContributionAndProofTopicFamily struct{ baseGossipsubTopicFamily }
func NewSyncContributionAndProofTopicFamily(s *Service, nse params.NetworkScheduleEntry) *SyncContributionAndProofTopicFamily {
return &SyncContributionAndProofTopicFamily{
baseGossipsubTopicFamily{
syncService: s,
nse: nse,
protocolSuffix: s.cfg.p2p.Encoding().ProtocolSuffix()},
}
}
func (sc *SyncContributionAndProofTopicFamily) Name() string {
return "SyncContributionAndProofTopicFamily"
}
func (sc *SyncContributionAndProofTopicFamily) Validator() wrappedVal {
return sc.syncService.validateSyncContributionAndProof
}
func (sc *SyncContributionAndProofTopicFamily) Handler() subHandler {
return sc.syncService.syncContributionAndProofSubscriber
}
func (sc *SyncContributionAndProofTopicFamily) GetFullTopicString() string {
return fmt.Sprintf(p2p.SyncContributionAndProofSubnetTopicFormat, sc.nse.ForkDigest) + sc.protocolSuffix
}
func (sc *SyncContributionAndProofTopicFamily) Subscribe() {
sc.syncService.subscribe(sc.GetFullTopicString(), sc.Validator(), sc.Handler())
}
func (sc *SyncContributionAndProofTopicFamily) Unsubscribe() {
sc.syncService.unSubscribeFromTopic(sc.GetFullTopicString())
}
// Light Client Optimistic Update (Altair+)
var _ GossipsubTopicFamilyWithoutDynamicSubnets = (*LightClientOptimisticUpdateTopicFamily)(nil)
type LightClientOptimisticUpdateTopicFamily struct {
baseGossipsubTopicFamily
}
func NewLightClientOptimisticUpdateTopicFamily(s *Service, nse params.NetworkScheduleEntry) *LightClientOptimisticUpdateTopicFamily {
return &LightClientOptimisticUpdateTopicFamily{
baseGossipsubTopicFamily{
syncService: s,
nse: nse,
protocolSuffix: s.cfg.p2p.Encoding().ProtocolSuffix()},
}
}
func (l *LightClientOptimisticUpdateTopicFamily) Name() string {
return "LightClientOptimisticUpdateTopicFamily"
}
func (l *LightClientOptimisticUpdateTopicFamily) Validator() wrappedVal {
return l.syncService.validateLightClientOptimisticUpdate
}
func (l *LightClientOptimisticUpdateTopicFamily) Handler() subHandler {
return noopHandler
}
func (l *LightClientOptimisticUpdateTopicFamily) GetFullTopicString() string {
return fmt.Sprintf(p2p.LightClientOptimisticUpdateTopicFormat, l.nse.ForkDigest) + l.protocolSuffix
}
func (l *LightClientOptimisticUpdateTopicFamily) Subscribe() {
l.syncService.subscribe(l.GetFullTopicString(), l.Validator(), l.Handler())
}
func (l *LightClientOptimisticUpdateTopicFamily) Unsubscribe() {
l.syncService.unSubscribeFromTopic(l.GetFullTopicString())
}
// Light Client Finality Update (Altair+)
var _ GossipsubTopicFamilyWithoutDynamicSubnets = (*LightClientFinalityUpdateTopicFamily)(nil)
type LightClientFinalityUpdateTopicFamily struct {
baseGossipsubTopicFamily
}
func NewLightClientFinalityUpdateTopicFamily(s *Service, nse params.NetworkScheduleEntry) *LightClientFinalityUpdateTopicFamily {
return &LightClientFinalityUpdateTopicFamily{
baseGossipsubTopicFamily{
syncService: s,
nse: nse,
protocolSuffix: s.cfg.p2p.Encoding().ProtocolSuffix()},
}
}
func (l *LightClientFinalityUpdateTopicFamily) Name() string {
return "LightClientFinalityUpdateTopicFamily"
}
func (l *LightClientFinalityUpdateTopicFamily) Validator() wrappedVal {
return l.syncService.validateLightClientFinalityUpdate
}
func (l *LightClientFinalityUpdateTopicFamily) Handler() subHandler {
return noopHandler
}
func (l *LightClientFinalityUpdateTopicFamily) GetFullTopicString() string {
return fmt.Sprintf(p2p.LightClientFinalityUpdateTopicFormat, l.nse.ForkDigest) + l.protocolSuffix
}
func (l *LightClientFinalityUpdateTopicFamily) Subscribe() {
l.syncService.subscribe(l.GetFullTopicString(), l.Validator(), l.Handler())
}
func (l *LightClientFinalityUpdateTopicFamily) Unsubscribe() {
l.syncService.unSubscribeFromTopic(l.GetFullTopicString())
}
// BLS to Execution Change (Capella+)
var _ GossipsubTopicFamilyWithoutDynamicSubnets = (*BlsToExecutionChangeTopicFamily)(nil)
type BlsToExecutionChangeTopicFamily struct {
baseGossipsubTopicFamily
}
func NewBlsToExecutionChangeTopicFamily(s *Service, nse params.NetworkScheduleEntry) *BlsToExecutionChangeTopicFamily {
return &BlsToExecutionChangeTopicFamily{
baseGossipsubTopicFamily{
syncService: s,
nse: nse,
protocolSuffix: s.cfg.p2p.Encoding().ProtocolSuffix()},
}
}
func (b *BlsToExecutionChangeTopicFamily) Name() string {
return "BlsToExecutionChangeTopicFamily"
}
func (b *BlsToExecutionChangeTopicFamily) Validator() wrappedVal {
return b.syncService.validateBlsToExecutionChange
}
func (b *BlsToExecutionChangeTopicFamily) Handler() subHandler {
return b.syncService.blsToExecutionChangeSubscriber
}
func (b *BlsToExecutionChangeTopicFamily) GetFullTopicString() string {
return fmt.Sprintf(p2p.BlsToExecutionChangeSubnetTopicFormat, b.nse.ForkDigest) + b.protocolSuffix
}
func (b *BlsToExecutionChangeTopicFamily) Subscribe() {
b.syncService.subscribe(b.GetFullTopicString(), b.Validator(), b.Handler())
}
func (b *BlsToExecutionChangeTopicFamily) Unsubscribe() {
b.syncService.unSubscribeFromTopic(b.GetFullTopicString())
}

View File

@@ -0,0 +1,4 @@
### Added
- A Gossipsub controller that uses well defined inetrfaces and implementations for ALL topics (subnetted and otherwise)
that Prysm subscribes to to the hide the gnarly details of gossipsub subscription management.