make registerSubscribers idempotent (#15779)

* make registerSubscribers idempotent

* clean up debugging changes

* test fix

* rm unused var

* sobbing noises

* naming feedback and separate test for digestActionDone

* gazelle

* manu's feedback

* refactor to enable immediate sub after init sync

* preston comment re panic causing db corruption risk

* ensure we check that we're 1 epoch past the fork

* manu feedback

---------

Co-authored-by: Kasey Kirkham <kasey@users.noreply.github.com>
This commit is contained in:
kasey
2025-10-07 20:09:22 -05:00
committed by GitHub
parent 06b5409ff0
commit 0d742c6f88
12 changed files with 363 additions and 276 deletions

View File

@@ -17,6 +17,7 @@ go_library(
"fuzz_exports.go", # keep
"log.go",
"metrics.go",
"once.go",
"options.go",
"pending_attestations_queue.go",
"pending_blocks_queue.go",
@@ -172,6 +173,7 @@ go_test(
"error_test.go",
"fork_watcher_test.go",
"kzg_batch_verifier_test.go",
"once_test.go",
"pending_attestations_queue_test.go",
"pending_blocks_queue_test.go",
"rate_limiter_test.go",

View File

@@ -2,7 +2,6 @@ package sync
import (
"bytes"
"errors"
"io"
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p"
@@ -12,6 +11,7 @@ import (
libp2pcore "github.com/libp2p/go-libp2p/core"
"github.com/libp2p/go-libp2p/core/network"
multiplex "github.com/libp2p/go-mplex"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
@@ -38,7 +38,7 @@ func ReadStatusCode(stream network.Stream, encoding encoder.NetworkEncoding) (ui
b := make([]byte, 1)
_, err := stream.Read(b)
if err != nil {
return 0, "", err
return 0, "", errors.Wrap(err, "stream read")
}
if b[0] == responseCodeSuccess {
@@ -52,7 +52,7 @@ func ReadStatusCode(stream network.Stream, encoding encoder.NetworkEncoding) (ui
SetStreamReadDeadline(stream, params.BeaconConfig().RespTimeoutDuration())
msg := &types.ErrorMessage{}
if err := encoding.DecodeWithMaxLength(stream, msg); err != nil {
return 0, "", err
return 0, "", errors.Wrap(err, "decode error message")
}
return b[0], string(*msg), nil

View File

@@ -9,23 +9,28 @@ import (
"github.com/pkg/errors"
)
// Is a background routine that observes for new incoming forks. Depending on the epoch
// it will be in charge of subscribing/unsubscribing the relevant topics at the fork boundaries.
func (s *Service) forkWatcher() {
<-s.initialSyncComplete
// p2pHandlerControlLoop runs in a continuous loop to ensure that:
// - We are subscribed to the correct gossipsub topics (for the current and upcoming epoch).
// - We have registered the correct RPC stream handlers (for the current and upcoming epoch).
// - We have cleaned up gossipsub topics and RPC stream handlers that are no longer needed.
func (s *Service) p2pHandlerControlLoop() {
// At startup, launch registration and peer discovery loops, and register rpc stream handlers.
startEntry := params.GetNetworkScheduleEntry(s.cfg.clock.CurrentEpoch())
s.registerSubscribers(startEntry)
slotTicker := slots.NewSlotTicker(s.cfg.clock.GenesisTime(), params.BeaconConfig().SecondsPerSlot)
for {
select {
// In the event of a node restart, we will still end up subscribing to the correct
// topics during/after the fork epoch. This routine is to ensure correct
// subscriptions for nodes running before a fork epoch.
case currSlot := <-slotTicker.C():
currEpoch := slots.ToEpoch(currSlot)
if err := s.registerForUpcomingFork(currEpoch); err != nil {
case <-slotTicker.C():
current := s.cfg.clock.CurrentEpoch()
if err := s.ensureRegistrationsForEpoch(current); err != nil {
log.WithError(err).Error("Unable to check for fork in the next epoch")
continue
}
if err := s.deregisterFromPastFork(currEpoch); err != nil {
if err := s.ensureDeregistrationForEpoch(current); err != nil {
log.WithError(err).Error("Unable to check for fork in the previous epoch")
continue
}
@@ -37,102 +42,90 @@ func (s *Service) forkWatcher() {
}
}
// registerForUpcomingFork registers appropriate gossip and RPC topic if there is a fork in the next epoch.
func (s *Service) registerForUpcomingFork(currentEpoch primitives.Epoch) error {
nextEntry := params.GetNetworkScheduleEntry(currentEpoch + 1)
// Check if there is a fork in the next epoch.
if nextEntry.ForkDigest == s.registeredNetworkEntry.ForkDigest {
return nil
}
// ensureRegistrationsForEpoch ensures that gossip topic and RPC stream handler
// registrations are in place for the current and subsequent epoch.
func (s *Service) ensureRegistrationsForEpoch(epoch primitives.Epoch) error {
current := params.GetNetworkScheduleEntry(epoch)
s.registerSubscribers(current)
if s.subHandler.digestExists(nextEntry.ForkDigest) {
return nil
}
// Register the subscribers (gossipsub) for the next epoch.
s.registerSubscribers(nextEntry.Epoch, nextEntry.ForkDigest)
// Get the handlers for the current and next fork.
currentHandler, err := s.rpcHandlerByTopicFromEpoch(currentEpoch)
currentHandler, err := s.rpcHandlerByTopicFromFork(current.VersionEnum)
if err != nil {
return errors.Wrap(err, "RPC handler by topic from before fork epoch")
}
if !s.digestActionDone(current.ForkDigest, registerRpcOnce) {
for topic, handler := range currentHandler {
s.registerRPC(topic, handler)
}
}
nextHandler, err := s.rpcHandlerByTopicFromEpoch(nextEntry.Epoch)
next := params.GetNetworkScheduleEntry(epoch + 1)
if current.Epoch == next.Epoch {
return nil // no fork in the next epoch
}
s.registerSubscribers(next)
if s.digestActionDone(next.ForkDigest, registerRpcOnce) {
return nil
}
nextHandler, err := s.rpcHandlerByTopicFromFork(next.VersionEnum)
if err != nil {
return errors.Wrap(err, "RPC handler by topic from fork epoch")
}
// Compute newly added topics.
newHandlersByTopic := addedRPCHandlerByTopic(currentHandler, nextHandler)
// Register the new RPC handlers.
// We deregister the old topics later, at least one epoch after the fork.
for topic, handler := range newHandlersByTopic {
s.registerRPC(topic, handler)
}
s.registeredNetworkEntry = nextEntry
return nil
}
// deregisterFromPastFork deregisters appropriate gossip and RPC topic if there is a fork in the current epoch.
func (s *Service) deregisterFromPastFork(currentEpoch primitives.Epoch) error {
// Get the fork.
currentFork, err := params.Fork(currentEpoch)
if err != nil {
return errors.Wrap(err, "genesis validators root")
}
// ensureDeregistrationForEpoch deregisters appropriate gossip and RPC topic if there is a fork in the current epoch.
func (s *Service) ensureDeregistrationForEpoch(currentEpoch primitives.Epoch) error {
current := params.GetNetworkScheduleEntry(currentEpoch)
// If we are still in our genesis fork version then exit early.
if currentFork.Epoch == params.BeaconConfig().GenesisEpoch {
if current.Epoch == params.BeaconConfig().GenesisEpoch {
return nil
}
if currentEpoch < current.Epoch+1 {
return nil // wait until we are 1 epoch into the fork
}
// Get the epoch after the fork epoch.
afterForkEpoch := currentFork.Epoch + 1
previous := params.GetNetworkScheduleEntry(current.Epoch - 1)
// Remove stream handlers for all topics that are in the set of
// currentTopics-previousTopics
if !s.digestActionDone(previous.ForkDigest, unregisterRpcOnce) {
previousTopics, err := s.rpcHandlerByTopicFromFork(previous.VersionEnum)
if err != nil {
return errors.Wrap(err, "RPC handler by topic from before fork epoch")
}
currentTopics, err := s.rpcHandlerByTopicFromFork(current.VersionEnum)
if err != nil {
return errors.Wrap(err, "RPC handler by topic from fork epoch")
}
topicsToRemove := removedRPCTopics(previousTopics, currentTopics)
for topic := range topicsToRemove {
fullTopic := topic + s.cfg.p2p.Encoding().ProtocolSuffix()
s.cfg.p2p.Host().RemoveStreamHandler(protocol.ID(fullTopic))
log.WithField("topic", fullTopic).Debug("Removed RPC handler")
}
}
// Start de-registering if the current epoch is after the fork epoch.
if currentEpoch != afterForkEpoch {
// Unsubscribe from all gossip topics with the previous fork digest.
if s.digestActionDone(previous.ForkDigest, unregisterGossipOnce) {
return nil
}
// Look at the previous fork's digest.
beforeForkEpoch := currentFork.Epoch - 1
beforeForkDigest := params.ForkDigest(beforeForkEpoch)
// Exit early if there are no topics with that particular digest.
if !s.subHandler.digestExists(beforeForkDigest) {
return nil
}
// Compute the RPC handlers that are no longer needed.
beforeForkHandlerByTopic, err := s.rpcHandlerByTopicFromEpoch(beforeForkEpoch)
if err != nil {
return errors.Wrap(err, "RPC handler by topic from before fork epoch")
}
forkHandlerByTopic, err := s.rpcHandlerByTopicFromEpoch(currentFork.Epoch)
if err != nil {
return errors.Wrap(err, "RPC handler by topic from fork epoch")
}
topicsToRemove := removedRPCTopics(beforeForkHandlerByTopic, forkHandlerByTopic)
for topic := range topicsToRemove {
fullTopic := topic + s.cfg.p2p.Encoding().ProtocolSuffix()
s.cfg.p2p.Host().RemoveStreamHandler(protocol.ID(fullTopic))
log.WithField("topic", fullTopic).Debug("Removed RPC handler")
}
// Run through all our current active topics and see
// if there are any subscriptions to be removed.
for _, t := range s.subHandler.allTopics() {
retDigest, err := p2p.ExtractGossipDigest(t)
if err != nil {
log.WithError(err).Error("Could not retrieve digest")
continue
}
if retDigest == beforeForkDigest {
if retDigest == previous.ForkDigest {
s.unSubscribeFromTopic(t)
}
}

View File

@@ -50,12 +50,36 @@ func testForkWatcherService(t *testing.T, current primitives.Epoch) *Service {
return r
}
func TestRegisterSubscriptions_Idempotent(t *testing.T) {
params.SetupTestConfigCleanup(t)
genesis.StoreEmbeddedDuringTest(t, params.BeaconConfig().ConfigName)
fulu := params.BeaconConfig().ElectraForkEpoch + 4096*2
params.BeaconConfig().FuluForkEpoch = fulu
params.BeaconConfig().InitializeForkSchedule()
current := fulu - 1
s := testForkWatcherService(t, current)
next := params.GetNetworkScheduleEntry(fulu)
wg := attachSpawner(s)
require.Equal(t, true, s.registerSubscribers(next))
done := make(chan struct{})
go func() { wg.Wait(); close(done) }()
select {
case <-time.After(5 * time.Second):
t.Fatal("timed out waiting for subscriptions to be registered")
case <-done:
}
// the goal of this callback is just to assert that spawn is never called.
s.subscriptionSpawner = func(func()) { t.Error("registration routines spawned twice for the same digest") }
require.NoError(t, s.ensureRegistrationsForEpoch(fulu))
}
func TestService_CheckForNextEpochFork(t *testing.T) {
closedChan := make(chan struct{})
close(closedChan)
params.SetupTestConfigCleanup(t)
genesis.StoreEmbeddedDuringTest(t, params.BeaconConfig().ConfigName)
params.BeaconConfig().FuluForkEpoch = params.BeaconConfig().ElectraForkEpoch + 1096*2
params.BeaconConfig().FuluForkEpoch = params.BeaconConfig().ElectraForkEpoch + 4096*2
params.BeaconConfig().InitializeForkSchedule()
tests := []struct {
@@ -171,7 +195,7 @@ func TestService_CheckForNextEpochFork(t *testing.T) {
current := tt.epochAtRegistration(tt.forkEpoch)
s := testForkWatcherService(t, current)
wg := attachSpawner(s)
require.NoError(t, s.registerForUpcomingFork(s.cfg.clock.CurrentEpoch()))
require.NoError(t, s.ensureRegistrationsForEpoch(s.cfg.clock.CurrentEpoch()))
wg.Wait()
tt.checkRegistration(t, s)
@@ -193,10 +217,13 @@ func TestService_CheckForNextEpochFork(t *testing.T) {
// Move the clock to just before the next fork epoch and ensure deregistration is correct
wg = attachSpawner(s)
s.cfg.clock = defaultClockWithTimeAtEpoch(tt.nextForkEpoch - 1)
require.NoError(t, s.registerForUpcomingFork(s.cfg.clock.CurrentEpoch()))
require.NoError(t, s.ensureRegistrationsForEpoch(s.cfg.clock.CurrentEpoch()))
wg.Wait()
require.NoError(t, s.ensureDeregistrationForEpoch(tt.nextForkEpoch))
assert.Equal(t, true, s.subHandler.digestExists(digest))
// deregister as if it is the epoch after the next fork epoch
require.NoError(t, s.deregisterFromPastFork(tt.nextForkEpoch+1))
require.NoError(t, s.ensureDeregistrationForEpoch(tt.nextForkEpoch+1))
assert.Equal(t, false, s.subHandler.digestExists(digest))
assert.Equal(t, true, s.subHandler.digestExists(nextDigest))
})

40
beacon-chain/sync/once.go Normal file
View File

@@ -0,0 +1,40 @@
package sync
import "sync"
// oncePerDigest represents an action that should only be performed once per fork digest.
type oncePerDigest uint8
const (
doneZero oncePerDigest = 0
registerGossipOnce oncePerDigest = 1 << 0
unregisterGossipOnce oncePerDigest = 1 << 1
registerRpcOnce oncePerDigest = 1 << 2
unregisterRpcOnce oncePerDigest = 1 << 3
)
// perDigestSet keeps track of which oncePerDigest actions
// have been performed for each fork digest.
type perDigestSet struct {
sync.Mutex
history map[[4]byte]oncePerDigest
}
// digestActionDone marks the action as done for the given digest, returning true if it was already done.
func (s *Service) digestActionDone(digest [4]byte, action oncePerDigest) bool {
s.digestActions.Lock()
defer s.digestActions.Unlock()
// lazy initialize registrationHistory; the lock is not a reference type so it is ready to go
if s.digestActions.history == nil {
s.digestActions.history = make(map[[4]byte]oncePerDigest)
}
prev := s.digestActions.history[digest]
// Return true if the bit was already set
if prev&action != 0 {
return true
}
s.digestActions.history[digest] = prev | action
return false
}

View File

@@ -0,0 +1,40 @@
package sync
import (
"fmt"
"slices"
"testing"
)
func TestDigestActionDone(t *testing.T) {
digests := [][4]byte{
{0, 0, 0, 0},
{1, 2, 3, 4},
{4, 3, 2, 1},
}
actions := []oncePerDigest{
registerGossipOnce,
unregisterGossipOnce,
registerRpcOnce,
unregisterRpcOnce,
}
testCombos := func(d [][4]byte, a []oncePerDigest) {
s := &Service{}
for _, digest := range d {
for _, action := range a {
t.Run(fmt.Sprintf("digest=%#x/action=%d", digest, action), func(t *testing.T) {
if s.digestActionDone(digest, action) {
t.Fatal("expected first call to return false")
}
if !s.digestActionDone(digest, action) {
t.Fatal("expected second call to return true")
}
})
}
}
}
testCombos(digests, actions)
slices.Reverse(digests)
slices.Reverse(actions)
testCombos(digests, actions)
}

View File

@@ -11,11 +11,9 @@ import (
p2ptypes "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/types"
"github.com/OffchainLabs/prysm/v6/config/features"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
"github.com/OffchainLabs/prysm/v6/monitoring/tracing"
"github.com/OffchainLabs/prysm/v6/monitoring/tracing/trace"
"github.com/OffchainLabs/prysm/v6/runtime/version"
"github.com/OffchainLabs/prysm/v6/time/slots"
libp2pcore "github.com/libp2p/go-libp2p/core"
"github.com/libp2p/go-libp2p/core/network"
"github.com/pkg/errors"
@@ -122,41 +120,9 @@ func (s *Service) rpcHandlerByTopicFromFork(forkIndex int) (map[string]rpcHandle
return nil, errors.Errorf("RPC handler not found for fork index %d", forkIndex)
}
// rpcHandlerByTopic returns the RPC handlers for a given epoch.
func (s *Service) rpcHandlerByTopicFromEpoch(epoch primitives.Epoch) (map[string]rpcHandler, error) {
// Get the beacon config.
beaconConfig := params.BeaconConfig()
if epoch >= beaconConfig.FuluForkEpoch {
return s.rpcHandlerByTopicFromFork(version.Fulu)
}
if epoch >= beaconConfig.ElectraForkEpoch {
return s.rpcHandlerByTopicFromFork(version.Electra)
}
if epoch >= beaconConfig.DenebForkEpoch {
return s.rpcHandlerByTopicFromFork(version.Deneb)
}
if epoch >= beaconConfig.CapellaForkEpoch {
return s.rpcHandlerByTopicFromFork(version.Capella)
}
if epoch >= beaconConfig.BellatrixForkEpoch {
return s.rpcHandlerByTopicFromFork(version.Bellatrix)
}
if epoch >= beaconConfig.AltairForkEpoch {
return s.rpcHandlerByTopicFromFork(version.Altair)
}
return s.rpcHandlerByTopicFromFork(version.Phase0)
}
// addedRPCHandlerByTopic returns the RPC handlers that are added in the new map that are not present in the old map.
func addedRPCHandlerByTopic(previous, next map[string]rpcHandler) map[string]rpcHandler {
added := make(map[string]rpcHandler)
added := make(map[string]rpcHandler, len(next))
for topic, handler := range next {
if _, ok := previous[topic]; !ok {
@@ -181,13 +147,12 @@ func removedRPCTopics(previous, next map[string]rpcHandler) map[string]bool {
}
// registerRPCHandlers for p2p RPC.
func (s *Service) registerRPCHandlers() error {
// Get the current epoch.
currentSlot := s.cfg.clock.CurrentSlot()
currentEpoch := slots.ToEpoch(currentSlot)
func (s *Service) registerRPCHandlers(nse params.NetworkScheduleEntry) error {
if s.digestActionDone(nse.ForkDigest, registerRpcOnce) {
return nil
}
// Get the RPC handlers for the current epoch.
handlerByTopic, err := s.rpcHandlerByTopicFromEpoch(currentEpoch)
handlerByTopic, err := s.rpcHandlerByTopicFromFork(nse.VersionEnum)
if err != nil {
return errors.Wrap(err, "rpc handler by topic from epoch")
}

View File

@@ -854,7 +854,7 @@ func TestRPCBeaconBlocksByRange_FilterBlocks(t *testing.T) {
blocks := make([]*ethpb.SignedBeaconBlock, 0, req.Count)
for i := req.StartSlot; i < req.StartSlot.Add(req.Count*req.Step); i += primitives.Slot(req.Step) {
code, _, err := ReadStatusCode(stream, &encoder.SszNetworkEncoder{})
if err != nil && err != io.EOF {
if err != nil && !errors.Is(err, io.EOF) {
t.Fatal(err)
}
if code != 0 || errors.Is(err, io.EOF) {

View File

@@ -180,7 +180,7 @@ type Service struct {
slasherEnabled bool
lcStore *lightClient.Store
dataColumnLogCh chan dataColumnLogEntry
registeredNetworkEntry params.NetworkScheduleEntry
digestActions perDigestSet
subscriptionSpawner func(func()) // see Service.spawn for details
}
@@ -377,10 +377,13 @@ func (s *Service) waitForChainStart() {
}
s.ctxMap = ctxMap
// Register respective rpc handlers at state initialized event.
err = s.registerRPCHandlers()
if err != nil {
log.WithError(err).Error("Could not register rpc handlers")
// We need to register RPC handlers ASAP so that we can handle incoming status message
// requests from peers.
nse := params.GetNetworkScheduleEntry(clock.CurrentEpoch())
if err := s.registerRPCHandlers(nse); err != nil {
// If we fail here, we won't be able to peer with anyone because we can't handle their status messages.
log.WithError(err).Error("Failed to register RPC handlers")
// TODO: need ability to bubble the error up to the top of the node init tree and exit safely.
return
}
@@ -401,22 +404,8 @@ func (s *Service) startDiscoveryAndSubscriptions() {
return
}
// Compute the current epoch.
currentSlot := slots.CurrentSlot(s.cfg.clock.GenesisTime())
currentEpoch := slots.ToEpoch(currentSlot)
// Compute the current fork forkDigest.
forkDigest, err := s.currentForkDigest()
if err != nil {
log.WithError(err).Error("Could not retrieve current fork digest")
return
}
// Register respective pubsub handlers at state synced event.
s.registerSubscribers(currentEpoch, forkDigest)
// Start the fork watcher.
go s.forkWatcher()
go s.p2pHandlerControlLoop()
}
func (s *Service) writeErrorResponseToStream(responseCode byte, reason string, stream libp2pcore.Stream) {
@@ -454,6 +443,15 @@ func (s *Service) chainIsStarted() bool {
return s.chainStarted.IsSet()
}
func (s *Service) waitForInitialSync(ctx context.Context) error {
select {
case <-s.initialSyncComplete:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
// Checker defines a struct which can verify whether a node is currently
// synchronizing a chain with the rest of peers in the network.
type Checker interface {

View File

@@ -55,7 +55,7 @@ type subscribeParameters struct {
topicFormat string
validate wrappedVal
handle subHandler
digest [4]byte
nse params.NetworkScheduleEntry
// getSubnetsToJoin is a function that returns all subnets the node should join.
getSubnetsToJoin func(currentSlot primitives.Slot) map[uint64]bool
// getSubnetsRequiringPeers is a function that returns all subnets that require peers to be found
@@ -70,7 +70,7 @@ func (p subscribeParameters) shortTopic() string {
if fmtLen >= 3 && short[fmtLen-3:] == "_%d" {
short = short[:fmtLen-3]
}
return fmt.Sprintf(short, p.digest)
return fmt.Sprintf(short, p.nse.ForkDigest)
}
func (p subscribeParameters) logFields() logrus.Fields {
@@ -81,7 +81,7 @@ func (p subscribeParameters) logFields() logrus.Fields {
// fullTopic is the fully qualified topic string, given to gossipsub.
func (p subscribeParameters) fullTopic(subnet uint64, suffix string) string {
return fmt.Sprintf(p.topicFormat, p.digest, subnet) + suffix
return fmt.Sprintf(p.topicFormat, p.nse.ForkDigest, subnet) + suffix
}
// subnetTracker keeps track of which subnets we are subscribed to, out of the set of
@@ -204,41 +204,45 @@ func (s *Service) spawn(f func()) {
}
// Register PubSub subscribers
func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) {
func (s *Service) registerSubscribers(nse params.NetworkScheduleEntry) bool {
// If we have already registered for this fork digest, exit early.
if s.digestActionDone(nse.ForkDigest, registerGossipOnce) {
return false
}
s.spawn(func() {
s.subscribe(p2p.BlockSubnetTopicFormat, s.validateBeaconBlockPubSub, s.beaconBlockSubscriber, digest)
s.subscribe(p2p.BlockSubnetTopicFormat, s.validateBeaconBlockPubSub, s.beaconBlockSubscriber, nse)
})
s.spawn(func() {
s.subscribe(p2p.AggregateAndProofSubnetTopicFormat, s.validateAggregateAndProof, s.beaconAggregateProofSubscriber, digest)
s.subscribe(p2p.AggregateAndProofSubnetTopicFormat, s.validateAggregateAndProof, s.beaconAggregateProofSubscriber, nse)
})
s.spawn(func() {
s.subscribe(p2p.ExitSubnetTopicFormat, s.validateVoluntaryExit, s.voluntaryExitSubscriber, digest)
s.subscribe(p2p.ExitSubnetTopicFormat, s.validateVoluntaryExit, s.voluntaryExitSubscriber, nse)
})
s.spawn(func() {
s.subscribe(p2p.ProposerSlashingSubnetTopicFormat, s.validateProposerSlashing, s.proposerSlashingSubscriber, digest)
s.subscribe(p2p.ProposerSlashingSubnetTopicFormat, s.validateProposerSlashing, s.proposerSlashingSubscriber, nse)
})
s.spawn(func() {
s.subscribe(p2p.AttesterSlashingSubnetTopicFormat, s.validateAttesterSlashing, s.attesterSlashingSubscriber, digest)
s.subscribe(p2p.AttesterSlashingSubnetTopicFormat, s.validateAttesterSlashing, s.attesterSlashingSubscriber, nse)
})
s.spawn(func() {
s.subscribeWithParameters(subscribeParameters{
topicFormat: p2p.AttestationSubnetTopicFormat,
validate: s.validateCommitteeIndexBeaconAttestation,
handle: s.committeeIndexBeaconAttestationSubscriber,
digest: digest,
getSubnetsToJoin: s.persistentAndAggregatorSubnetIndices,
getSubnetsRequiringPeers: attesterSubnetIndices,
nse: nse,
})
})
// New gossip topic in Altair
if params.BeaconConfig().AltairForkEpoch <= epoch {
if params.BeaconConfig().AltairForkEpoch <= nse.Epoch {
s.spawn(func() {
s.subscribe(
p2p.SyncContributionAndProofSubnetTopicFormat,
s.validateSyncContributionAndProof,
s.syncContributionAndProofSubscriber,
digest,
nse,
)
})
s.spawn(func() {
@@ -246,8 +250,8 @@ func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) {
topicFormat: p2p.SyncCommitteeSubnetTopicFormat,
validate: s.validateSyncCommitteeMessage,
handle: s.syncCommitteeMessageSubscriber,
digest: digest,
getSubnetsToJoin: s.activeSyncSubnetIndices,
nse: nse,
})
})
@@ -257,7 +261,7 @@ func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) {
p2p.LightClientOptimisticUpdateTopicFormat,
s.validateLightClientOptimisticUpdate,
noopHandler,
digest,
nse,
)
})
s.spawn(func() {
@@ -265,32 +269,32 @@ func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) {
p2p.LightClientFinalityUpdateTopicFormat,
s.validateLightClientFinalityUpdate,
noopHandler,
digest,
nse,
)
})
}
}
// New gossip topic in Capella
if params.BeaconConfig().CapellaForkEpoch <= epoch {
if params.BeaconConfig().CapellaForkEpoch <= nse.Epoch {
s.spawn(func() {
s.subscribe(
p2p.BlsToExecutionChangeSubnetTopicFormat,
s.validateBlsToExecutionChange,
s.blsToExecutionChangeSubscriber,
digest,
nse,
)
})
}
// New gossip topic in Deneb, removed in Electra
if params.BeaconConfig().DenebForkEpoch <= epoch && epoch < params.BeaconConfig().ElectraForkEpoch {
if params.BeaconConfig().DenebForkEpoch <= nse.Epoch && nse.Epoch < params.BeaconConfig().ElectraForkEpoch {
s.spawn(func() {
s.subscribeWithParameters(subscribeParameters{
topicFormat: p2p.BlobSubnetTopicFormat,
validate: s.validateBlob,
handle: s.blobSubscriber,
digest: digest,
nse: nse,
getSubnetsToJoin: func(primitives.Slot) map[uint64]bool {
return mapFromCount(params.BeaconConfig().BlobsidecarSubnetCount)
},
@@ -299,13 +303,13 @@ func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) {
}
// New gossip topic in Electra, removed in Fulu
if params.BeaconConfig().ElectraForkEpoch <= epoch && epoch < params.BeaconConfig().FuluForkEpoch {
if params.BeaconConfig().ElectraForkEpoch <= nse.Epoch && nse.Epoch < params.BeaconConfig().FuluForkEpoch {
s.spawn(func() {
s.subscribeWithParameters(subscribeParameters{
topicFormat: p2p.BlobSubnetTopicFormat,
validate: s.validateBlob,
handle: s.blobSubscriber,
digest: digest,
nse: nse,
getSubnetsToJoin: func(currentSlot primitives.Slot) map[uint64]bool {
return mapFromCount(params.BeaconConfig().BlobsidecarSubnetCountElectra)
},
@@ -314,35 +318,54 @@ func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) {
}
// New gossip topic in Fulu.
if params.BeaconConfig().FuluForkEpoch <= epoch {
if params.BeaconConfig().FuluForkEpoch <= nse.Epoch {
s.spawn(func() {
s.subscribeWithParameters(subscribeParameters{
topicFormat: p2p.DataColumnSubnetTopicFormat,
validate: s.validateDataColumn,
handle: s.dataColumnSubscriber,
digest: digest,
nse: nse,
getSubnetsToJoin: s.dataColumnSubnetIndices,
getSubnetsRequiringPeers: s.allDataColumnSubnets,
})
})
}
return true
}
func (s *Service) subscriptionRequestExpired(nse params.NetworkScheduleEntry) bool {
next := params.NextNetworkScheduleEntry(nse.Epoch)
return next.Epoch != nse.Epoch && s.cfg.clock.CurrentEpoch() > next.Epoch
}
func (s *Service) subscribeLogFields(topic string, nse params.NetworkScheduleEntry) logrus.Fields {
return logrus.Fields{
"topic": topic,
"digest": nse.ForkDigest,
"forkEpoch": nse.Epoch,
"currentEpoch": s.cfg.clock.CurrentEpoch(),
}
}
// subscribe to a given topic with a given validator and subscription handler.
// The base protobuf message is used to initialize new messages for decoding.
func (s *Service) subscribe(topic string, validator wrappedVal, handle subHandler, digest [4]byte) {
<-s.initialSyncComplete
_, e, err := params.ForkDataFromDigest(digest)
if err != nil {
// Impossible condition as it would mean digest does not exist.
panic(err) // lint:nopanic -- Impossible condition.
func (s *Service) subscribe(topic string, validator wrappedVal, handle subHandler, nse params.NetworkScheduleEntry) {
if err := s.waitForInitialSync(s.ctx); err != nil {
log.WithFields(s.subscribeLogFields(topic, nse)).WithError(err).Debug("Context cancelled while waiting for initial sync, not subscribing to topic")
return
}
base := p2p.GossipTopicMappings(topic, e)
// Check if this subscribe request is still valid - we may have crossed another fork epoch while waiting for initial sync.
if s.subscriptionRequestExpired(nse) {
// If we are already past the next fork epoch, do not subscribe to this topic.
log.WithFields(s.subscribeLogFields(topic, nse)).Debug("Not subscribing to topic as we are already past the next fork epoch")
return
}
base := p2p.GossipTopicMappings(topic, nse.Epoch)
if base == nil {
// Impossible condition as it would mean topic does not exist.
panic(fmt.Sprintf("%s is not mapped to any message in GossipTopicMappings", topic)) // lint:nopanic -- Impossible condition.
}
s.subscribeWithBase(s.addDigestToTopic(topic, digest), validator, handle)
s.subscribeWithBase(s.addDigestToTopic(topic, nse.ForkDigest), validator, handle)
}
func (s *Service) subscribeWithBase(topic string, validator wrappedVal, handle subHandler) *pubsub.Subscription {
@@ -352,7 +375,7 @@ func (s *Service) subscribeWithBase(topic string, validator wrappedVal, handle s
// Do not resubscribe already seen subscriptions.
ok := s.subHandler.topicExists(topic)
if ok {
log.WithField("topic", topic).Debug("Provided topic already has an active subscription running")
log.WithField("topic", topic).Error("Provided topic already has an active subscription running")
return nil
}
@@ -504,89 +527,93 @@ func (s *Service) wrapAndReportValidation(topic string, v wrappedVal) (string, p
}
}
// pruneSubscriptions unsubscribes from topics we are currently subscribed to but that are
// pruneNotWanted unsubscribes from topics we are currently subscribed to but that are
// not in the list of wanted subnets.
// This function mutates the `subscriptionBySubnet` map, which is used to keep track of the current subscriptions.
func (s *Service) pruneSubscriptions(t *subnetTracker, wantedSubnets map[uint64]bool) {
func (s *Service) pruneNotWanted(t *subnetTracker, wantedSubnets map[uint64]bool) {
for _, subnet := range t.unwanted(wantedSubnets) {
t.cancelSubscription(subnet)
s.unSubscribeFromTopic(t.fullTopic(subnet, s.cfg.p2p.Encoding().ProtocolSuffix()))
}
}
// subscribeToSubnets subscribes to needed subnets and unsubscribe from unneeded ones.
// This functions mutates the `subscriptionBySubnet` map, which is used to keep track of the current subscriptions.
func (s *Service) subscribeToSubnets(t *subnetTracker) error {
// Do not subscribe if not synced.
if s.chainStarted.IsSet() && s.cfg.initialSync.Syncing() {
return nil
}
valid, err := isDigestValid(t.digest, s.cfg.clock)
if err != nil {
return errors.Wrap(err, "is digest valid")
}
// Unsubscribe from all subnets if digest is not valid. It's likely to be the case after a hard fork.
if !valid {
s.pruneSubscriptions(t, nil)
return errInvalidDigest
}
subnetsToJoin := t.getSubnetsToJoin(s.cfg.clock.CurrentSlot())
s.pruneSubscriptions(t, subnetsToJoin)
for _, subnet := range t.missing(subnetsToJoin) {
// TODO: subscribeWithBase appends the protocol suffix, other methods don't. Make this consistent.
topic := t.fullTopic(subnet, "")
t.track(subnet, s.subscribeWithBase(topic, t.validate, t.handle))
}
return nil
}
// subscribeWithParameters subscribes to a list of subnets.
func (s *Service) subscribeWithParameters(p subscribeParameters) {
ctx, cancel := context.WithCancel(s.ctx)
defer cancel()
tracker := newSubnetTracker(p)
// Try once immediately so we don't have to wait until the next slot.
s.ensureSubnetPeersAndSubscribe(tracker)
go s.logMinimumPeersPerSubnet(p)
go s.ensurePeers(ctx, tracker)
go s.logMinimumPeersPerSubnet(ctx, p)
if err := s.waitForInitialSync(ctx); err != nil {
log.WithFields(p.logFields()).WithError(err).Debug("Could not subscribe to subnets as initial sync failed")
return
}
s.trySubscribeSubnets(tracker)
slotTicker := slots.NewSlotTicker(s.cfg.clock.GenesisTime(), params.BeaconConfig().SecondsPerSlot)
defer slotTicker.Done()
for {
select {
case <-slotTicker.C():
s.ensureSubnetPeersAndSubscribe(tracker)
// Check if this subscribe request is still valid - we may have crossed another fork epoch while waiting for initial sync.
if s.subscriptionRequestExpired(p.nse) {
// If we are already past the next fork epoch, do not subscribe to this topic.
log.WithFields(logrus.Fields{
"topic": p.shortTopic(),
"digest": p.nse.ForkDigest,
"epoch": p.nse.Epoch,
"currentEpoch": s.cfg.clock.CurrentEpoch(),
}).Debug("Exiting topic subnet subscription loop")
return
}
s.trySubscribeSubnets(tracker)
case <-s.ctx.Done():
return
}
}
}
func (s *Service) ensureSubnetPeersAndSubscribe(tracker *subnetTracker) {
timeout := time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Second
minPeers := flags.Get().MinimumPeersPerSubnet
logFields := tracker.logFields()
neededSubnets := computeAllNeededSubnets(s.cfg.clock.CurrentSlot(), tracker.getSubnetsToJoin, tracker.getSubnetsRequiringPeers)
if err := s.subscribeToSubnets(tracker); err != nil {
if errors.Is(err, errInvalidDigest) {
log.WithFields(logFields).Debug("Digest is invalid, stopping subscription")
return
}
log.WithFields(logFields).WithError(err).Error("Could not subscribe to subnets")
return
}
ctx, cancel := context.WithTimeout(s.ctx, timeout)
defer cancel()
if err := s.cfg.p2p.FindAndDialPeersWithSubnets(ctx, tracker.topicFormat, tracker.digest, minPeers, neededSubnets); err != nil && !errors.Is(err, context.DeadlineExceeded) {
log.WithFields(logFields).WithError(err).Debug("Could not find peers with subnets")
// trySubscribeSubnets attempts to subscribe to any missing subnets that we should be subscribed to.
// Only if initial sync is complete.
func (s *Service) trySubscribeSubnets(t *subnetTracker) {
subnetsToJoin := t.getSubnetsToJoin(s.cfg.clock.CurrentSlot())
s.pruneNotWanted(t, subnetsToJoin)
for _, subnet := range t.missing(subnetsToJoin) {
// TODO: subscribeWithBase appends the protocol suffix, other methods don't. Make this consistent.
topic := t.fullTopic(subnet, "")
t.track(subnet, s.subscribeWithBase(topic, t.validate, t.handle))
}
}
func (s *Service) logMinimumPeersPerSubnet(p subscribeParameters) {
func (s *Service) ensurePeers(ctx context.Context, tracker *subnetTracker) {
// Try once immediately so we don't have to wait until the next slot.
s.tryEnsurePeers(ctx, tracker)
oncePerSlot := slots.NewSlotTicker(s.cfg.clock.GenesisTime(), params.BeaconConfig().SecondsPerSlot)
defer oncePerSlot.Done()
for {
select {
case <-oncePerSlot.C():
s.tryEnsurePeers(ctx, tracker)
case <-ctx.Done():
return
}
}
}
func (s *Service) tryEnsurePeers(ctx context.Context, tracker *subnetTracker) {
timeout := (time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Second) - 100*time.Millisecond
minPeers := flags.Get().MinimumPeersPerSubnet
neededSubnets := computeAllNeededSubnets(s.cfg.clock.CurrentSlot(), tracker.getSubnetsToJoin, tracker.getSubnetsRequiringPeers)
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
err := s.cfg.p2p.FindAndDialPeersWithSubnets(ctx, tracker.topicFormat, tracker.nse.ForkDigest, minPeers, neededSubnets)
if err != nil && !errors.Is(err, context.DeadlineExceeded) {
log.WithFields(tracker.logFields()).WithError(err).Debug("Could not find peers with subnets")
}
}
func (s *Service) logMinimumPeersPerSubnet(ctx context.Context, p subscribeParameters) {
logFields := p.logFields()
minimumPeersPerSubnet := flags.Get().MinimumPeersPerSubnet
// Warn the user if we are not subscribed to enough peers in the subnets.
@@ -603,7 +630,7 @@ func (s *Service) logMinimumPeersPerSubnet(p subscribeParameters) {
isSubnetWithMissingPeers := false
// Find new peers for wanted subnets if needed.
for index := range subnetsToFindPeersIndex {
topic := fmt.Sprintf(p.topicFormat, p.digest, index)
topic := fmt.Sprintf(p.topicFormat, p.nse.ForkDigest, index)
// Check if we have enough peers in the subnet. Skip if we do.
if count := s.connectedPeersCount(topic); count < minimumPeersPerSubnet {
@@ -617,7 +644,7 @@ func (s *Service) logMinimumPeersPerSubnet(p subscribeParameters) {
if !isSubnetWithMissingPeers {
log.WithFields(logFields).Debug("All subnets have enough connected peers")
}
case <-s.ctx.Done():
case <-ctx.Done():
return
}
}

View File

@@ -57,8 +57,9 @@ func TestSubscribe_ReceivesValidMessage(t *testing.T) {
}
markInitSyncComplete(t, &r)
var err error
p2pService.Digest, err = r.currentForkDigest()
require.NoError(t, err)
nse := params.GetNetworkScheduleEntry(r.cfg.clock.CurrentEpoch())
p2pService.Digest = nse.ForkDigest
topic := "/eth2/%x/voluntary_exit"
var wg sync.WaitGroup
wg.Add(1)
@@ -71,7 +72,7 @@ func TestSubscribe_ReceivesValidMessage(t *testing.T) {
}
wg.Done()
return nil
}, p2pService.Digest)
}, nse)
r.markForChainStart()
p2pService.ReceivePubSub(topic, &pb.SignedVoluntaryExit{Exit: &pb.VoluntaryExit{Epoch: 55}, Signature: make([]byte, fieldparams.BLSSignatureLength)})
@@ -105,14 +106,13 @@ func TestSubscribe_UnsubscribeTopic(t *testing.T) {
subHandler: newSubTopicHandler(),
}
markInitSyncComplete(t, &r)
var err error
p2pService.Digest, err = r.currentForkDigest()
require.NoError(t, err)
nse := params.GetNetworkScheduleEntry(r.cfg.clock.CurrentEpoch())
p2pService.Digest = nse.ForkDigest
topic := "/eth2/%x/voluntary_exit"
r.subscribe(topic, r.noopValidator, func(_ context.Context, msg proto.Message) error {
return nil
}, p2pService.Digest)
}, nse)
r.markForChainStart()
fullTopic := fmt.Sprintf(topic, p2pService.Digest) + p2pService.Encoding().ProtocolSuffix()
@@ -160,14 +160,13 @@ func TestSubscribe_ReceivesAttesterSlashing(t *testing.T) {
topic := "/eth2/%x/attester_slashing"
var wg sync.WaitGroup
wg.Add(1)
var err error
p2pService.Digest, err = r.currentForkDigest()
require.NoError(t, err)
nse := params.GetNetworkScheduleEntry(r.cfg.clock.CurrentEpoch())
p2pService.Digest = nse.ForkDigest
r.subscribe(topic, r.noopValidator, func(ctx context.Context, msg proto.Message) error {
require.NoError(t, r.attesterSlashingSubscriber(ctx, msg))
wg.Done()
return nil
}, p2pService.Digest)
}, nse)
beaconState, privKeys := util.DeterministicGenesisState(t, 64)
chainService.State = beaconState
r.markForChainStart()
@@ -216,14 +215,13 @@ func TestSubscribe_ReceivesProposerSlashing(t *testing.T) {
wg.Add(1)
params.SetupTestConfigCleanup(t)
params.OverrideBeaconConfig(params.MainnetConfig())
var err error
p2pService.Digest, err = r.currentForkDigest()
require.NoError(t, err)
nse := params.GetNetworkScheduleEntry(r.cfg.clock.CurrentEpoch())
p2pService.Digest = nse.ForkDigest
r.subscribe(topic, r.noopValidator, func(ctx context.Context, msg proto.Message) error {
require.NoError(t, r.proposerSlashingSubscriber(ctx, msg))
wg.Done()
return nil
}, p2pService.Digest)
}, nse)
beaconState, privKeys := util.DeterministicGenesisState(t, 64)
chainService.State = beaconState
r.markForChainStart()
@@ -261,9 +259,8 @@ func TestSubscribe_HandlesPanic(t *testing.T) {
}
markInitSyncComplete(t, &r)
var err error
p.Digest, err = r.currentForkDigest()
require.NoError(t, err)
nse := params.GetNetworkScheduleEntry(r.cfg.clock.CurrentEpoch())
p.Digest = nse.ForkDigest
topic := p2p.GossipTypeMapping[reflect.TypeOf(&pb.SignedVoluntaryExit{})]
var wg sync.WaitGroup
@@ -272,7 +269,7 @@ func TestSubscribe_HandlesPanic(t *testing.T) {
r.subscribe(topic, r.noopValidator, func(_ context.Context, msg proto.Message) error {
defer wg.Done()
panic("bad")
}, p.Digest)
}, nse)
r.markForChainStart()
p.ReceivePubSub(topic, &pb.SignedVoluntaryExit{Exit: &pb.VoluntaryExit{Epoch: 55}, Signature: make([]byte, fieldparams.BLSSignatureLength)})
@@ -298,12 +295,11 @@ func TestRevalidateSubscription_CorrectlyFormatsTopic(t *testing.T) {
chainStarted: abool.New(),
subHandler: newSubTopicHandler(),
}
digest, err := r.currentForkDigest()
require.NoError(t, err)
nse := params.GetNetworkScheduleEntry(r.cfg.clock.CurrentEpoch())
params := subscribeParameters{
topicFormat: "/eth2/testing/%#x/committee%d",
digest: digest,
nse: nse,
}
tracker := newSubnetTracker(params)
@@ -326,7 +322,7 @@ func TestRevalidateSubscription_CorrectlyFormatsTopic(t *testing.T) {
require.NoError(t, err)
tracker.track(c2, sub2)
r.pruneSubscriptions(tracker, map[uint64]bool{c2: true})
r.pruneNotWanted(tracker, map[uint64]bool{c2: true})
require.LogsDoNotContain(t, hook, "Could not unregister topic validator")
}
@@ -483,6 +479,7 @@ func TestFilterSubnetPeers(t *testing.T) {
chainStarted: abool.New(),
subHandler: newSubTopicHandler(),
}
markInitSyncComplete(t, &r)
// Empty cache at the end of the test.
defer cache.SubnetIDs.EmptyAllCaches()
digest, err := r.currentForkDigest()
@@ -548,16 +545,16 @@ func TestSubscribeWithSyncSubnets_DynamicOK(t *testing.T) {
chainStarted: abool.New(),
subHandler: newSubTopicHandler(),
}
markInitSyncComplete(t, &r)
// Empty cache at the end of the test.
defer cache.SyncSubnetIDs.EmptyAllCaches()
slot := r.cfg.clock.CurrentSlot()
currEpoch := slots.ToEpoch(slot)
cache.SyncSubnetIDs.AddSyncCommitteeSubnets([]byte("pubkey"), currEpoch, []uint64{0, 1}, 10*time.Second)
digest, err := r.currentForkDigest()
assert.NoError(t, err)
nse := params.GetNetworkScheduleEntry(r.cfg.clock.CurrentEpoch())
go r.subscribeWithParameters(subscribeParameters{
topicFormat: p2p.SyncCommitteeSubnetTopicFormat,
digest: digest,
nse: nse,
getSubnetsToJoin: r.activeSyncSubnetIndices,
})
time.Sleep(2 * time.Second)
@@ -566,10 +563,10 @@ func TestSubscribeWithSyncSubnets_DynamicOK(t *testing.T) {
for _, t := range r.cfg.p2p.PubSub().GetTopics() {
topicMap[t] = true
}
firstSub := fmt.Sprintf(p2p.SyncCommitteeSubnetTopicFormat, digest, 0) + r.cfg.p2p.Encoding().ProtocolSuffix()
firstSub := fmt.Sprintf(p2p.SyncCommitteeSubnetTopicFormat, nse.ForkDigest, 0) + r.cfg.p2p.Encoding().ProtocolSuffix()
assert.Equal(t, true, topicMap[firstSub])
secondSub := fmt.Sprintf(p2p.SyncCommitteeSubnetTopicFormat, digest, 1) + r.cfg.p2p.Encoding().ProtocolSuffix()
secondSub := fmt.Sprintf(p2p.SyncCommitteeSubnetTopicFormat, nse.ForkDigest, 1) + r.cfg.p2p.Encoding().ProtocolSuffix()
assert.Equal(t, true, topicMap[secondSub])
cancel()
}
@@ -600,43 +597,39 @@ func TestSubscribeWithSyncSubnets_DynamicSwitchFork(t *testing.T) {
// Empty cache at the end of the test.
defer cache.SyncSubnetIDs.EmptyAllCaches()
cache.SyncSubnetIDs.AddSyncCommitteeSubnets([]byte("pubkey"), 0, []uint64{0, 1}, 10*time.Second)
digest := params.ForkDigest(r.cfg.clock.CurrentEpoch())
version, e, err := params.ForkDataFromDigest(digest)
require.NoError(t, err)
require.Equal(t, [4]byte(params.BeaconConfig().DenebForkVersion), version)
require.Equal(t, params.BeaconConfig().DenebForkEpoch, e)
nse := params.GetNetworkScheduleEntry(r.cfg.clock.CurrentEpoch())
require.Equal(t, [4]byte(params.BeaconConfig().DenebForkVersion), nse.ForkVersion)
require.Equal(t, params.BeaconConfig().DenebForkEpoch, nse.Epoch)
sp := newSubnetTracker(subscribeParameters{
topicFormat: p2p.SyncCommitteeSubnetTopicFormat,
digest: digest,
nse: nse,
getSubnetsToJoin: r.activeSyncSubnetIndices,
})
require.NoError(t, r.subscribeToSubnets(sp))
r.trySubscribeSubnets(sp)
assert.Equal(t, 2, len(r.cfg.p2p.PubSub().GetTopics()))
topicMap := map[string]bool{}
for _, t := range r.cfg.p2p.PubSub().GetTopics() {
topicMap[t] = true
}
firstSub := fmt.Sprintf(p2p.SyncCommitteeSubnetTopicFormat, digest, 0) + r.cfg.p2p.Encoding().ProtocolSuffix()
firstSub := fmt.Sprintf(p2p.SyncCommitteeSubnetTopicFormat, nse.ForkDigest, 0) + r.cfg.p2p.Encoding().ProtocolSuffix()
assert.Equal(t, true, topicMap[firstSub])
secondSub := fmt.Sprintf(p2p.SyncCommitteeSubnetTopicFormat, digest, 1) + r.cfg.p2p.Encoding().ProtocolSuffix()
secondSub := fmt.Sprintf(p2p.SyncCommitteeSubnetTopicFormat, nse.ForkDigest, 1) + r.cfg.p2p.Encoding().ProtocolSuffix()
assert.Equal(t, true, topicMap[secondSub])
electraSlot, err := slots.EpochStart(params.BeaconConfig().ElectraForkEpoch)
require.NoError(t, err)
mockNow.SetSlot(t, clock, electraSlot)
digest = params.ForkDigest(r.cfg.clock.CurrentEpoch())
version, e, err = params.ForkDataFromDigest(digest)
require.NoError(t, err)
require.Equal(t, [4]byte(params.BeaconConfig().ElectraForkVersion), version)
require.Equal(t, params.BeaconConfig().ElectraForkEpoch, e)
nse = params.GetNetworkScheduleEntry(r.cfg.clock.CurrentEpoch())
require.Equal(t, [4]byte(params.BeaconConfig().ElectraForkVersion), nse.ForkVersion)
require.Equal(t, params.BeaconConfig().ElectraForkEpoch, nse.Epoch)
sp.digest = digest
sp.nse = nse
// clear the cache and re-subscribe to subnets.
// this should result in the subscriptions being removed
cache.SyncSubnetIDs.EmptyAllCaches()
require.NoError(t, r.subscribeToSubnets(sp))
r.trySubscribeSubnets(sp)
assert.Equal(t, 0, len(r.cfg.p2p.PubSub().GetTopics()))
}

View File

@@ -0,0 +1,2 @@
### Fixed
- Fixed issue #15738 where separate goroutines assume sole responsibility for topic registration.