diff --git a/beacon-chain/sync/BUILD.bazel b/beacon-chain/sync/BUILD.bazel index 44112d8e4f..00b9354aa1 100644 --- a/beacon-chain/sync/BUILD.bazel +++ b/beacon-chain/sync/BUILD.bazel @@ -17,6 +17,7 @@ go_library( "fuzz_exports.go", # keep "log.go", "metrics.go", + "once.go", "options.go", "pending_attestations_queue.go", "pending_blocks_queue.go", @@ -172,6 +173,7 @@ go_test( "error_test.go", "fork_watcher_test.go", "kzg_batch_verifier_test.go", + "once_test.go", "pending_attestations_queue_test.go", "pending_blocks_queue_test.go", "rate_limiter_test.go", diff --git a/beacon-chain/sync/error.go b/beacon-chain/sync/error.go index c67af0417f..594820f4ad 100644 --- a/beacon-chain/sync/error.go +++ b/beacon-chain/sync/error.go @@ -2,7 +2,6 @@ package sync import ( "bytes" - "errors" "io" "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p" @@ -12,6 +11,7 @@ import ( libp2pcore "github.com/libp2p/go-libp2p/core" "github.com/libp2p/go-libp2p/core/network" multiplex "github.com/libp2p/go-mplex" + "github.com/pkg/errors" "github.com/sirupsen/logrus" ) @@ -38,7 +38,7 @@ func ReadStatusCode(stream network.Stream, encoding encoder.NetworkEncoding) (ui b := make([]byte, 1) _, err := stream.Read(b) if err != nil { - return 0, "", err + return 0, "", errors.Wrap(err, "stream read") } if b[0] == responseCodeSuccess { @@ -52,7 +52,7 @@ func ReadStatusCode(stream network.Stream, encoding encoder.NetworkEncoding) (ui SetStreamReadDeadline(stream, params.BeaconConfig().RespTimeoutDuration()) msg := &types.ErrorMessage{} if err := encoding.DecodeWithMaxLength(stream, msg); err != nil { - return 0, "", err + return 0, "", errors.Wrap(err, "decode error message") } return b[0], string(*msg), nil diff --git a/beacon-chain/sync/fork_watcher.go b/beacon-chain/sync/fork_watcher.go index 7b6663dd70..a4791f3c40 100644 --- a/beacon-chain/sync/fork_watcher.go +++ b/beacon-chain/sync/fork_watcher.go @@ -9,23 +9,28 @@ import ( "github.com/pkg/errors" ) -// Is a background routine that observes for new incoming forks. Depending on the epoch -// it will be in charge of subscribing/unsubscribing the relevant topics at the fork boundaries. -func (s *Service) forkWatcher() { - <-s.initialSyncComplete +// p2pHandlerControlLoop runs in a continuous loop to ensure that: +// - We are subscribed to the correct gossipsub topics (for the current and upcoming epoch). +// - We have registered the correct RPC stream handlers (for the current and upcoming epoch). +// - We have cleaned up gossipsub topics and RPC stream handlers that are no longer needed. +func (s *Service) p2pHandlerControlLoop() { + // At startup, launch registration and peer discovery loops, and register rpc stream handlers. + startEntry := params.GetNetworkScheduleEntry(s.cfg.clock.CurrentEpoch()) + s.registerSubscribers(startEntry) + slotTicker := slots.NewSlotTicker(s.cfg.clock.GenesisTime(), params.BeaconConfig().SecondsPerSlot) for { select { // In the event of a node restart, we will still end up subscribing to the correct // topics during/after the fork epoch. This routine is to ensure correct // subscriptions for nodes running before a fork epoch. - case currSlot := <-slotTicker.C(): - currEpoch := slots.ToEpoch(currSlot) - if err := s.registerForUpcomingFork(currEpoch); err != nil { + case <-slotTicker.C(): + current := s.cfg.clock.CurrentEpoch() + if err := s.ensureRegistrationsForEpoch(current); err != nil { log.WithError(err).Error("Unable to check for fork in the next epoch") continue } - if err := s.deregisterFromPastFork(currEpoch); err != nil { + if err := s.ensureDeregistrationForEpoch(current); err != nil { log.WithError(err).Error("Unable to check for fork in the previous epoch") continue } @@ -37,102 +42,90 @@ func (s *Service) forkWatcher() { } } -// registerForUpcomingFork registers appropriate gossip and RPC topic if there is a fork in the next epoch. -func (s *Service) registerForUpcomingFork(currentEpoch primitives.Epoch) error { - nextEntry := params.GetNetworkScheduleEntry(currentEpoch + 1) - // Check if there is a fork in the next epoch. - if nextEntry.ForkDigest == s.registeredNetworkEntry.ForkDigest { - return nil - } +// ensureRegistrationsForEpoch ensures that gossip topic and RPC stream handler +// registrations are in place for the current and subsequent epoch. +func (s *Service) ensureRegistrationsForEpoch(epoch primitives.Epoch) error { + current := params.GetNetworkScheduleEntry(epoch) + s.registerSubscribers(current) - if s.subHandler.digestExists(nextEntry.ForkDigest) { - return nil - } - - // Register the subscribers (gossipsub) for the next epoch. - s.registerSubscribers(nextEntry.Epoch, nextEntry.ForkDigest) - - // Get the handlers for the current and next fork. - currentHandler, err := s.rpcHandlerByTopicFromEpoch(currentEpoch) + currentHandler, err := s.rpcHandlerByTopicFromFork(current.VersionEnum) if err != nil { return errors.Wrap(err, "RPC handler by topic from before fork epoch") } + if !s.digestActionDone(current.ForkDigest, registerRpcOnce) { + for topic, handler := range currentHandler { + s.registerRPC(topic, handler) + } + } - nextHandler, err := s.rpcHandlerByTopicFromEpoch(nextEntry.Epoch) + next := params.GetNetworkScheduleEntry(epoch + 1) + if current.Epoch == next.Epoch { + return nil // no fork in the next epoch + } + s.registerSubscribers(next) + + if s.digestActionDone(next.ForkDigest, registerRpcOnce) { + return nil + } + + nextHandler, err := s.rpcHandlerByTopicFromFork(next.VersionEnum) if err != nil { return errors.Wrap(err, "RPC handler by topic from fork epoch") } - // Compute newly added topics. newHandlersByTopic := addedRPCHandlerByTopic(currentHandler, nextHandler) - // Register the new RPC handlers. + // We deregister the old topics later, at least one epoch after the fork. for topic, handler := range newHandlersByTopic { s.registerRPC(topic, handler) } - s.registeredNetworkEntry = nextEntry return nil } -// deregisterFromPastFork deregisters appropriate gossip and RPC topic if there is a fork in the current epoch. -func (s *Service) deregisterFromPastFork(currentEpoch primitives.Epoch) error { - // Get the fork. - currentFork, err := params.Fork(currentEpoch) - if err != nil { - return errors.Wrap(err, "genesis validators root") - } +// ensureDeregistrationForEpoch deregisters appropriate gossip and RPC topic if there is a fork in the current epoch. +func (s *Service) ensureDeregistrationForEpoch(currentEpoch primitives.Epoch) error { + current := params.GetNetworkScheduleEntry(currentEpoch) // If we are still in our genesis fork version then exit early. - if currentFork.Epoch == params.BeaconConfig().GenesisEpoch { + if current.Epoch == params.BeaconConfig().GenesisEpoch { return nil } + if currentEpoch < current.Epoch+1 { + return nil // wait until we are 1 epoch into the fork + } - // Get the epoch after the fork epoch. - afterForkEpoch := currentFork.Epoch + 1 + previous := params.GetNetworkScheduleEntry(current.Epoch - 1) + // Remove stream handlers for all topics that are in the set of + // currentTopics-previousTopics + if !s.digestActionDone(previous.ForkDigest, unregisterRpcOnce) { + previousTopics, err := s.rpcHandlerByTopicFromFork(previous.VersionEnum) + if err != nil { + return errors.Wrap(err, "RPC handler by topic from before fork epoch") + } + currentTopics, err := s.rpcHandlerByTopicFromFork(current.VersionEnum) + if err != nil { + return errors.Wrap(err, "RPC handler by topic from fork epoch") + } + topicsToRemove := removedRPCTopics(previousTopics, currentTopics) + for topic := range topicsToRemove { + fullTopic := topic + s.cfg.p2p.Encoding().ProtocolSuffix() + s.cfg.p2p.Host().RemoveStreamHandler(protocol.ID(fullTopic)) + log.WithField("topic", fullTopic).Debug("Removed RPC handler") + } + } - // Start de-registering if the current epoch is after the fork epoch. - if currentEpoch != afterForkEpoch { + // Unsubscribe from all gossip topics with the previous fork digest. + if s.digestActionDone(previous.ForkDigest, unregisterGossipOnce) { return nil } - - // Look at the previous fork's digest. - beforeForkEpoch := currentFork.Epoch - 1 - - beforeForkDigest := params.ForkDigest(beforeForkEpoch) - - // Exit early if there are no topics with that particular digest. - if !s.subHandler.digestExists(beforeForkDigest) { - return nil - } - - // Compute the RPC handlers that are no longer needed. - beforeForkHandlerByTopic, err := s.rpcHandlerByTopicFromEpoch(beforeForkEpoch) - if err != nil { - return errors.Wrap(err, "RPC handler by topic from before fork epoch") - } - - forkHandlerByTopic, err := s.rpcHandlerByTopicFromEpoch(currentFork.Epoch) - if err != nil { - return errors.Wrap(err, "RPC handler by topic from fork epoch") - } - - topicsToRemove := removedRPCTopics(beforeForkHandlerByTopic, forkHandlerByTopic) - for topic := range topicsToRemove { - fullTopic := topic + s.cfg.p2p.Encoding().ProtocolSuffix() - s.cfg.p2p.Host().RemoveStreamHandler(protocol.ID(fullTopic)) - log.WithField("topic", fullTopic).Debug("Removed RPC handler") - } - - // Run through all our current active topics and see - // if there are any subscriptions to be removed. for _, t := range s.subHandler.allTopics() { retDigest, err := p2p.ExtractGossipDigest(t) if err != nil { log.WithError(err).Error("Could not retrieve digest") continue } - if retDigest == beforeForkDigest { + if retDigest == previous.ForkDigest { s.unSubscribeFromTopic(t) } } diff --git a/beacon-chain/sync/fork_watcher_test.go b/beacon-chain/sync/fork_watcher_test.go index 1375b1a51f..e6466a763e 100644 --- a/beacon-chain/sync/fork_watcher_test.go +++ b/beacon-chain/sync/fork_watcher_test.go @@ -50,12 +50,36 @@ func testForkWatcherService(t *testing.T, current primitives.Epoch) *Service { return r } +func TestRegisterSubscriptions_Idempotent(t *testing.T) { + params.SetupTestConfigCleanup(t) + genesis.StoreEmbeddedDuringTest(t, params.BeaconConfig().ConfigName) + fulu := params.BeaconConfig().ElectraForkEpoch + 4096*2 + params.BeaconConfig().FuluForkEpoch = fulu + params.BeaconConfig().InitializeForkSchedule() + + current := fulu - 1 + s := testForkWatcherService(t, current) + next := params.GetNetworkScheduleEntry(fulu) + wg := attachSpawner(s) + require.Equal(t, true, s.registerSubscribers(next)) + done := make(chan struct{}) + go func() { wg.Wait(); close(done) }() + select { + case <-time.After(5 * time.Second): + t.Fatal("timed out waiting for subscriptions to be registered") + case <-done: + } + // the goal of this callback is just to assert that spawn is never called. + s.subscriptionSpawner = func(func()) { t.Error("registration routines spawned twice for the same digest") } + require.NoError(t, s.ensureRegistrationsForEpoch(fulu)) +} + func TestService_CheckForNextEpochFork(t *testing.T) { closedChan := make(chan struct{}) close(closedChan) params.SetupTestConfigCleanup(t) genesis.StoreEmbeddedDuringTest(t, params.BeaconConfig().ConfigName) - params.BeaconConfig().FuluForkEpoch = params.BeaconConfig().ElectraForkEpoch + 1096*2 + params.BeaconConfig().FuluForkEpoch = params.BeaconConfig().ElectraForkEpoch + 4096*2 params.BeaconConfig().InitializeForkSchedule() tests := []struct { @@ -171,7 +195,7 @@ func TestService_CheckForNextEpochFork(t *testing.T) { current := tt.epochAtRegistration(tt.forkEpoch) s := testForkWatcherService(t, current) wg := attachSpawner(s) - require.NoError(t, s.registerForUpcomingFork(s.cfg.clock.CurrentEpoch())) + require.NoError(t, s.ensureRegistrationsForEpoch(s.cfg.clock.CurrentEpoch())) wg.Wait() tt.checkRegistration(t, s) @@ -193,10 +217,13 @@ func TestService_CheckForNextEpochFork(t *testing.T) { // Move the clock to just before the next fork epoch and ensure deregistration is correct wg = attachSpawner(s) s.cfg.clock = defaultClockWithTimeAtEpoch(tt.nextForkEpoch - 1) - require.NoError(t, s.registerForUpcomingFork(s.cfg.clock.CurrentEpoch())) + require.NoError(t, s.ensureRegistrationsForEpoch(s.cfg.clock.CurrentEpoch())) wg.Wait() + + require.NoError(t, s.ensureDeregistrationForEpoch(tt.nextForkEpoch)) + assert.Equal(t, true, s.subHandler.digestExists(digest)) // deregister as if it is the epoch after the next fork epoch - require.NoError(t, s.deregisterFromPastFork(tt.nextForkEpoch+1)) + require.NoError(t, s.ensureDeregistrationForEpoch(tt.nextForkEpoch+1)) assert.Equal(t, false, s.subHandler.digestExists(digest)) assert.Equal(t, true, s.subHandler.digestExists(nextDigest)) }) diff --git a/beacon-chain/sync/once.go b/beacon-chain/sync/once.go new file mode 100644 index 0000000000..65f3551c19 --- /dev/null +++ b/beacon-chain/sync/once.go @@ -0,0 +1,40 @@ +package sync + +import "sync" + +// oncePerDigest represents an action that should only be performed once per fork digest. +type oncePerDigest uint8 + +const ( + doneZero oncePerDigest = 0 + registerGossipOnce oncePerDigest = 1 << 0 + unregisterGossipOnce oncePerDigest = 1 << 1 + registerRpcOnce oncePerDigest = 1 << 2 + unregisterRpcOnce oncePerDigest = 1 << 3 +) + +// perDigestSet keeps track of which oncePerDigest actions +// have been performed for each fork digest. +type perDigestSet struct { + sync.Mutex + history map[[4]byte]oncePerDigest +} + +// digestActionDone marks the action as done for the given digest, returning true if it was already done. +func (s *Service) digestActionDone(digest [4]byte, action oncePerDigest) bool { + s.digestActions.Lock() + defer s.digestActions.Unlock() + // lazy initialize registrationHistory; the lock is not a reference type so it is ready to go + if s.digestActions.history == nil { + s.digestActions.history = make(map[[4]byte]oncePerDigest) + } + + prev := s.digestActions.history[digest] + // Return true if the bit was already set + if prev&action != 0 { + return true + } + + s.digestActions.history[digest] = prev | action + return false +} diff --git a/beacon-chain/sync/once_test.go b/beacon-chain/sync/once_test.go new file mode 100644 index 0000000000..82729b8788 --- /dev/null +++ b/beacon-chain/sync/once_test.go @@ -0,0 +1,40 @@ +package sync + +import ( + "fmt" + "slices" + "testing" +) + +func TestDigestActionDone(t *testing.T) { + digests := [][4]byte{ + {0, 0, 0, 0}, + {1, 2, 3, 4}, + {4, 3, 2, 1}, + } + actions := []oncePerDigest{ + registerGossipOnce, + unregisterGossipOnce, + registerRpcOnce, + unregisterRpcOnce, + } + testCombos := func(d [][4]byte, a []oncePerDigest) { + s := &Service{} + for _, digest := range d { + for _, action := range a { + t.Run(fmt.Sprintf("digest=%#x/action=%d", digest, action), func(t *testing.T) { + if s.digestActionDone(digest, action) { + t.Fatal("expected first call to return false") + } + if !s.digestActionDone(digest, action) { + t.Fatal("expected second call to return true") + } + }) + } + } + } + testCombos(digests, actions) + slices.Reverse(digests) + slices.Reverse(actions) + testCombos(digests, actions) +} diff --git a/beacon-chain/sync/rpc.go b/beacon-chain/sync/rpc.go index 6122748cb3..edfa79c241 100644 --- a/beacon-chain/sync/rpc.go +++ b/beacon-chain/sync/rpc.go @@ -11,11 +11,9 @@ import ( p2ptypes "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/types" "github.com/OffchainLabs/prysm/v6/config/features" "github.com/OffchainLabs/prysm/v6/config/params" - "github.com/OffchainLabs/prysm/v6/consensus-types/primitives" "github.com/OffchainLabs/prysm/v6/monitoring/tracing" "github.com/OffchainLabs/prysm/v6/monitoring/tracing/trace" "github.com/OffchainLabs/prysm/v6/runtime/version" - "github.com/OffchainLabs/prysm/v6/time/slots" libp2pcore "github.com/libp2p/go-libp2p/core" "github.com/libp2p/go-libp2p/core/network" "github.com/pkg/errors" @@ -122,41 +120,9 @@ func (s *Service) rpcHandlerByTopicFromFork(forkIndex int) (map[string]rpcHandle return nil, errors.Errorf("RPC handler not found for fork index %d", forkIndex) } -// rpcHandlerByTopic returns the RPC handlers for a given epoch. -func (s *Service) rpcHandlerByTopicFromEpoch(epoch primitives.Epoch) (map[string]rpcHandler, error) { - // Get the beacon config. - beaconConfig := params.BeaconConfig() - - if epoch >= beaconConfig.FuluForkEpoch { - return s.rpcHandlerByTopicFromFork(version.Fulu) - } - - if epoch >= beaconConfig.ElectraForkEpoch { - return s.rpcHandlerByTopicFromFork(version.Electra) - } - - if epoch >= beaconConfig.DenebForkEpoch { - return s.rpcHandlerByTopicFromFork(version.Deneb) - } - - if epoch >= beaconConfig.CapellaForkEpoch { - return s.rpcHandlerByTopicFromFork(version.Capella) - } - - if epoch >= beaconConfig.BellatrixForkEpoch { - return s.rpcHandlerByTopicFromFork(version.Bellatrix) - } - - if epoch >= beaconConfig.AltairForkEpoch { - return s.rpcHandlerByTopicFromFork(version.Altair) - } - - return s.rpcHandlerByTopicFromFork(version.Phase0) -} - // addedRPCHandlerByTopic returns the RPC handlers that are added in the new map that are not present in the old map. func addedRPCHandlerByTopic(previous, next map[string]rpcHandler) map[string]rpcHandler { - added := make(map[string]rpcHandler) + added := make(map[string]rpcHandler, len(next)) for topic, handler := range next { if _, ok := previous[topic]; !ok { @@ -181,13 +147,12 @@ func removedRPCTopics(previous, next map[string]rpcHandler) map[string]bool { } // registerRPCHandlers for p2p RPC. -func (s *Service) registerRPCHandlers() error { - // Get the current epoch. - currentSlot := s.cfg.clock.CurrentSlot() - currentEpoch := slots.ToEpoch(currentSlot) - +func (s *Service) registerRPCHandlers(nse params.NetworkScheduleEntry) error { + if s.digestActionDone(nse.ForkDigest, registerRpcOnce) { + return nil + } // Get the RPC handlers for the current epoch. - handlerByTopic, err := s.rpcHandlerByTopicFromEpoch(currentEpoch) + handlerByTopic, err := s.rpcHandlerByTopicFromFork(nse.VersionEnum) if err != nil { return errors.Wrap(err, "rpc handler by topic from epoch") } diff --git a/beacon-chain/sync/rpc_beacon_blocks_by_range_test.go b/beacon-chain/sync/rpc_beacon_blocks_by_range_test.go index 1e58011511..0b893b50aa 100644 --- a/beacon-chain/sync/rpc_beacon_blocks_by_range_test.go +++ b/beacon-chain/sync/rpc_beacon_blocks_by_range_test.go @@ -854,7 +854,7 @@ func TestRPCBeaconBlocksByRange_FilterBlocks(t *testing.T) { blocks := make([]*ethpb.SignedBeaconBlock, 0, req.Count) for i := req.StartSlot; i < req.StartSlot.Add(req.Count*req.Step); i += primitives.Slot(req.Step) { code, _, err := ReadStatusCode(stream, &encoder.SszNetworkEncoder{}) - if err != nil && err != io.EOF { + if err != nil && !errors.Is(err, io.EOF) { t.Fatal(err) } if code != 0 || errors.Is(err, io.EOF) { diff --git a/beacon-chain/sync/service.go b/beacon-chain/sync/service.go index 22edbe23dc..02ccffd9f3 100644 --- a/beacon-chain/sync/service.go +++ b/beacon-chain/sync/service.go @@ -180,7 +180,7 @@ type Service struct { slasherEnabled bool lcStore *lightClient.Store dataColumnLogCh chan dataColumnLogEntry - registeredNetworkEntry params.NetworkScheduleEntry + digestActions perDigestSet subscriptionSpawner func(func()) // see Service.spawn for details } @@ -377,10 +377,13 @@ func (s *Service) waitForChainStart() { } s.ctxMap = ctxMap - // Register respective rpc handlers at state initialized event. - err = s.registerRPCHandlers() - if err != nil { - log.WithError(err).Error("Could not register rpc handlers") + // We need to register RPC handlers ASAP so that we can handle incoming status message + // requests from peers. + nse := params.GetNetworkScheduleEntry(clock.CurrentEpoch()) + if err := s.registerRPCHandlers(nse); err != nil { + // If we fail here, we won't be able to peer with anyone because we can't handle their status messages. + log.WithError(err).Error("Failed to register RPC handlers") + // TODO: need ability to bubble the error up to the top of the node init tree and exit safely. return } @@ -401,22 +404,8 @@ func (s *Service) startDiscoveryAndSubscriptions() { return } - // Compute the current epoch. - currentSlot := slots.CurrentSlot(s.cfg.clock.GenesisTime()) - currentEpoch := slots.ToEpoch(currentSlot) - - // Compute the current fork forkDigest. - forkDigest, err := s.currentForkDigest() - if err != nil { - log.WithError(err).Error("Could not retrieve current fork digest") - return - } - - // Register respective pubsub handlers at state synced event. - s.registerSubscribers(currentEpoch, forkDigest) - // Start the fork watcher. - go s.forkWatcher() + go s.p2pHandlerControlLoop() } func (s *Service) writeErrorResponseToStream(responseCode byte, reason string, stream libp2pcore.Stream) { @@ -454,6 +443,15 @@ func (s *Service) chainIsStarted() bool { return s.chainStarted.IsSet() } +func (s *Service) waitForInitialSync(ctx context.Context) error { + select { + case <-s.initialSyncComplete: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + // Checker defines a struct which can verify whether a node is currently // synchronizing a chain with the rest of peers in the network. type Checker interface { diff --git a/beacon-chain/sync/subscriber.go b/beacon-chain/sync/subscriber.go index 4d32c10b65..103f3e3825 100644 --- a/beacon-chain/sync/subscriber.go +++ b/beacon-chain/sync/subscriber.go @@ -55,7 +55,7 @@ type subscribeParameters struct { topicFormat string validate wrappedVal handle subHandler - digest [4]byte + nse params.NetworkScheduleEntry // getSubnetsToJoin is a function that returns all subnets the node should join. getSubnetsToJoin func(currentSlot primitives.Slot) map[uint64]bool // getSubnetsRequiringPeers is a function that returns all subnets that require peers to be found @@ -70,7 +70,7 @@ func (p subscribeParameters) shortTopic() string { if fmtLen >= 3 && short[fmtLen-3:] == "_%d" { short = short[:fmtLen-3] } - return fmt.Sprintf(short, p.digest) + return fmt.Sprintf(short, p.nse.ForkDigest) } func (p subscribeParameters) logFields() logrus.Fields { @@ -81,7 +81,7 @@ func (p subscribeParameters) logFields() logrus.Fields { // fullTopic is the fully qualified topic string, given to gossipsub. func (p subscribeParameters) fullTopic(subnet uint64, suffix string) string { - return fmt.Sprintf(p.topicFormat, p.digest, subnet) + suffix + return fmt.Sprintf(p.topicFormat, p.nse.ForkDigest, subnet) + suffix } // subnetTracker keeps track of which subnets we are subscribed to, out of the set of @@ -204,41 +204,45 @@ func (s *Service) spawn(f func()) { } // Register PubSub subscribers -func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) { +func (s *Service) registerSubscribers(nse params.NetworkScheduleEntry) bool { + // If we have already registered for this fork digest, exit early. + if s.digestActionDone(nse.ForkDigest, registerGossipOnce) { + return false + } s.spawn(func() { - s.subscribe(p2p.BlockSubnetTopicFormat, s.validateBeaconBlockPubSub, s.beaconBlockSubscriber, digest) + s.subscribe(p2p.BlockSubnetTopicFormat, s.validateBeaconBlockPubSub, s.beaconBlockSubscriber, nse) }) s.spawn(func() { - s.subscribe(p2p.AggregateAndProofSubnetTopicFormat, s.validateAggregateAndProof, s.beaconAggregateProofSubscriber, digest) + s.subscribe(p2p.AggregateAndProofSubnetTopicFormat, s.validateAggregateAndProof, s.beaconAggregateProofSubscriber, nse) }) s.spawn(func() { - s.subscribe(p2p.ExitSubnetTopicFormat, s.validateVoluntaryExit, s.voluntaryExitSubscriber, digest) + s.subscribe(p2p.ExitSubnetTopicFormat, s.validateVoluntaryExit, s.voluntaryExitSubscriber, nse) }) s.spawn(func() { - s.subscribe(p2p.ProposerSlashingSubnetTopicFormat, s.validateProposerSlashing, s.proposerSlashingSubscriber, digest) + s.subscribe(p2p.ProposerSlashingSubnetTopicFormat, s.validateProposerSlashing, s.proposerSlashingSubscriber, nse) }) s.spawn(func() { - s.subscribe(p2p.AttesterSlashingSubnetTopicFormat, s.validateAttesterSlashing, s.attesterSlashingSubscriber, digest) + s.subscribe(p2p.AttesterSlashingSubnetTopicFormat, s.validateAttesterSlashing, s.attesterSlashingSubscriber, nse) }) s.spawn(func() { s.subscribeWithParameters(subscribeParameters{ topicFormat: p2p.AttestationSubnetTopicFormat, validate: s.validateCommitteeIndexBeaconAttestation, handle: s.committeeIndexBeaconAttestationSubscriber, - digest: digest, getSubnetsToJoin: s.persistentAndAggregatorSubnetIndices, getSubnetsRequiringPeers: attesterSubnetIndices, + nse: nse, }) }) // New gossip topic in Altair - if params.BeaconConfig().AltairForkEpoch <= epoch { + if params.BeaconConfig().AltairForkEpoch <= nse.Epoch { s.spawn(func() { s.subscribe( p2p.SyncContributionAndProofSubnetTopicFormat, s.validateSyncContributionAndProof, s.syncContributionAndProofSubscriber, - digest, + nse, ) }) s.spawn(func() { @@ -246,8 +250,8 @@ func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) { topicFormat: p2p.SyncCommitteeSubnetTopicFormat, validate: s.validateSyncCommitteeMessage, handle: s.syncCommitteeMessageSubscriber, - digest: digest, getSubnetsToJoin: s.activeSyncSubnetIndices, + nse: nse, }) }) @@ -257,7 +261,7 @@ func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) { p2p.LightClientOptimisticUpdateTopicFormat, s.validateLightClientOptimisticUpdate, noopHandler, - digest, + nse, ) }) s.spawn(func() { @@ -265,32 +269,32 @@ func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) { p2p.LightClientFinalityUpdateTopicFormat, s.validateLightClientFinalityUpdate, noopHandler, - digest, + nse, ) }) } } // New gossip topic in Capella - if params.BeaconConfig().CapellaForkEpoch <= epoch { + if params.BeaconConfig().CapellaForkEpoch <= nse.Epoch { s.spawn(func() { s.subscribe( p2p.BlsToExecutionChangeSubnetTopicFormat, s.validateBlsToExecutionChange, s.blsToExecutionChangeSubscriber, - digest, + nse, ) }) } // New gossip topic in Deneb, removed in Electra - if params.BeaconConfig().DenebForkEpoch <= epoch && epoch < params.BeaconConfig().ElectraForkEpoch { + if params.BeaconConfig().DenebForkEpoch <= nse.Epoch && nse.Epoch < params.BeaconConfig().ElectraForkEpoch { s.spawn(func() { s.subscribeWithParameters(subscribeParameters{ topicFormat: p2p.BlobSubnetTopicFormat, validate: s.validateBlob, handle: s.blobSubscriber, - digest: digest, + nse: nse, getSubnetsToJoin: func(primitives.Slot) map[uint64]bool { return mapFromCount(params.BeaconConfig().BlobsidecarSubnetCount) }, @@ -299,13 +303,13 @@ func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) { } // New gossip topic in Electra, removed in Fulu - if params.BeaconConfig().ElectraForkEpoch <= epoch && epoch < params.BeaconConfig().FuluForkEpoch { + if params.BeaconConfig().ElectraForkEpoch <= nse.Epoch && nse.Epoch < params.BeaconConfig().FuluForkEpoch { s.spawn(func() { s.subscribeWithParameters(subscribeParameters{ topicFormat: p2p.BlobSubnetTopicFormat, validate: s.validateBlob, handle: s.blobSubscriber, - digest: digest, + nse: nse, getSubnetsToJoin: func(currentSlot primitives.Slot) map[uint64]bool { return mapFromCount(params.BeaconConfig().BlobsidecarSubnetCountElectra) }, @@ -314,35 +318,54 @@ func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) { } // New gossip topic in Fulu. - if params.BeaconConfig().FuluForkEpoch <= epoch { + if params.BeaconConfig().FuluForkEpoch <= nse.Epoch { s.spawn(func() { s.subscribeWithParameters(subscribeParameters{ topicFormat: p2p.DataColumnSubnetTopicFormat, validate: s.validateDataColumn, handle: s.dataColumnSubscriber, - digest: digest, + nse: nse, getSubnetsToJoin: s.dataColumnSubnetIndices, getSubnetsRequiringPeers: s.allDataColumnSubnets, }) }) } + return true +} + +func (s *Service) subscriptionRequestExpired(nse params.NetworkScheduleEntry) bool { + next := params.NextNetworkScheduleEntry(nse.Epoch) + return next.Epoch != nse.Epoch && s.cfg.clock.CurrentEpoch() > next.Epoch +} + +func (s *Service) subscribeLogFields(topic string, nse params.NetworkScheduleEntry) logrus.Fields { + return logrus.Fields{ + "topic": topic, + "digest": nse.ForkDigest, + "forkEpoch": nse.Epoch, + "currentEpoch": s.cfg.clock.CurrentEpoch(), + } } // subscribe to a given topic with a given validator and subscription handler. // The base protobuf message is used to initialize new messages for decoding. -func (s *Service) subscribe(topic string, validator wrappedVal, handle subHandler, digest [4]byte) { - <-s.initialSyncComplete - _, e, err := params.ForkDataFromDigest(digest) - if err != nil { - // Impossible condition as it would mean digest does not exist. - panic(err) // lint:nopanic -- Impossible condition. +func (s *Service) subscribe(topic string, validator wrappedVal, handle subHandler, nse params.NetworkScheduleEntry) { + if err := s.waitForInitialSync(s.ctx); err != nil { + log.WithFields(s.subscribeLogFields(topic, nse)).WithError(err).Debug("Context cancelled while waiting for initial sync, not subscribing to topic") + return } - base := p2p.GossipTopicMappings(topic, e) + // Check if this subscribe request is still valid - we may have crossed another fork epoch while waiting for initial sync. + if s.subscriptionRequestExpired(nse) { + // If we are already past the next fork epoch, do not subscribe to this topic. + log.WithFields(s.subscribeLogFields(topic, nse)).Debug("Not subscribing to topic as we are already past the next fork epoch") + return + } + base := p2p.GossipTopicMappings(topic, nse.Epoch) if base == nil { // Impossible condition as it would mean topic does not exist. panic(fmt.Sprintf("%s is not mapped to any message in GossipTopicMappings", topic)) // lint:nopanic -- Impossible condition. } - s.subscribeWithBase(s.addDigestToTopic(topic, digest), validator, handle) + s.subscribeWithBase(s.addDigestToTopic(topic, nse.ForkDigest), validator, handle) } func (s *Service) subscribeWithBase(topic string, validator wrappedVal, handle subHandler) *pubsub.Subscription { @@ -352,7 +375,7 @@ func (s *Service) subscribeWithBase(topic string, validator wrappedVal, handle s // Do not resubscribe already seen subscriptions. ok := s.subHandler.topicExists(topic) if ok { - log.WithField("topic", topic).Debug("Provided topic already has an active subscription running") + log.WithField("topic", topic).Error("Provided topic already has an active subscription running") return nil } @@ -504,89 +527,93 @@ func (s *Service) wrapAndReportValidation(topic string, v wrappedVal) (string, p } } -// pruneSubscriptions unsubscribes from topics we are currently subscribed to but that are +// pruneNotWanted unsubscribes from topics we are currently subscribed to but that are // not in the list of wanted subnets. -// This function mutates the `subscriptionBySubnet` map, which is used to keep track of the current subscriptions. -func (s *Service) pruneSubscriptions(t *subnetTracker, wantedSubnets map[uint64]bool) { +func (s *Service) pruneNotWanted(t *subnetTracker, wantedSubnets map[uint64]bool) { for _, subnet := range t.unwanted(wantedSubnets) { t.cancelSubscription(subnet) s.unSubscribeFromTopic(t.fullTopic(subnet, s.cfg.p2p.Encoding().ProtocolSuffix())) } } -// subscribeToSubnets subscribes to needed subnets and unsubscribe from unneeded ones. -// This functions mutates the `subscriptionBySubnet` map, which is used to keep track of the current subscriptions. -func (s *Service) subscribeToSubnets(t *subnetTracker) error { - // Do not subscribe if not synced. - if s.chainStarted.IsSet() && s.cfg.initialSync.Syncing() { - return nil - } - - valid, err := isDigestValid(t.digest, s.cfg.clock) - if err != nil { - return errors.Wrap(err, "is digest valid") - } - - // Unsubscribe from all subnets if digest is not valid. It's likely to be the case after a hard fork. - if !valid { - s.pruneSubscriptions(t, nil) - return errInvalidDigest - } - - subnetsToJoin := t.getSubnetsToJoin(s.cfg.clock.CurrentSlot()) - s.pruneSubscriptions(t, subnetsToJoin) - for _, subnet := range t.missing(subnetsToJoin) { - // TODO: subscribeWithBase appends the protocol suffix, other methods don't. Make this consistent. - topic := t.fullTopic(subnet, "") - t.track(subnet, s.subscribeWithBase(topic, t.validate, t.handle)) - } - - return nil -} - // subscribeWithParameters subscribes to a list of subnets. func (s *Service) subscribeWithParameters(p subscribeParameters) { + ctx, cancel := context.WithCancel(s.ctx) + defer cancel() + tracker := newSubnetTracker(p) - // Try once immediately so we don't have to wait until the next slot. - s.ensureSubnetPeersAndSubscribe(tracker) - - go s.logMinimumPeersPerSubnet(p) + go s.ensurePeers(ctx, tracker) + go s.logMinimumPeersPerSubnet(ctx, p) + if err := s.waitForInitialSync(ctx); err != nil { + log.WithFields(p.logFields()).WithError(err).Debug("Could not subscribe to subnets as initial sync failed") + return + } + s.trySubscribeSubnets(tracker) slotTicker := slots.NewSlotTicker(s.cfg.clock.GenesisTime(), params.BeaconConfig().SecondsPerSlot) defer slotTicker.Done() for { select { case <-slotTicker.C(): - s.ensureSubnetPeersAndSubscribe(tracker) + // Check if this subscribe request is still valid - we may have crossed another fork epoch while waiting for initial sync. + if s.subscriptionRequestExpired(p.nse) { + // If we are already past the next fork epoch, do not subscribe to this topic. + log.WithFields(logrus.Fields{ + "topic": p.shortTopic(), + "digest": p.nse.ForkDigest, + "epoch": p.nse.Epoch, + "currentEpoch": s.cfg.clock.CurrentEpoch(), + }).Debug("Exiting topic subnet subscription loop") + return + } + s.trySubscribeSubnets(tracker) case <-s.ctx.Done(): return } } } -func (s *Service) ensureSubnetPeersAndSubscribe(tracker *subnetTracker) { - timeout := time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Second - minPeers := flags.Get().MinimumPeersPerSubnet - logFields := tracker.logFields() - neededSubnets := computeAllNeededSubnets(s.cfg.clock.CurrentSlot(), tracker.getSubnetsToJoin, tracker.getSubnetsRequiringPeers) - - if err := s.subscribeToSubnets(tracker); err != nil { - if errors.Is(err, errInvalidDigest) { - log.WithFields(logFields).Debug("Digest is invalid, stopping subscription") - return - } - log.WithFields(logFields).WithError(err).Error("Could not subscribe to subnets") - return - } - - ctx, cancel := context.WithTimeout(s.ctx, timeout) - defer cancel() - if err := s.cfg.p2p.FindAndDialPeersWithSubnets(ctx, tracker.topicFormat, tracker.digest, minPeers, neededSubnets); err != nil && !errors.Is(err, context.DeadlineExceeded) { - log.WithFields(logFields).WithError(err).Debug("Could not find peers with subnets") +// trySubscribeSubnets attempts to subscribe to any missing subnets that we should be subscribed to. +// Only if initial sync is complete. +func (s *Service) trySubscribeSubnets(t *subnetTracker) { + subnetsToJoin := t.getSubnetsToJoin(s.cfg.clock.CurrentSlot()) + s.pruneNotWanted(t, subnetsToJoin) + for _, subnet := range t.missing(subnetsToJoin) { + // TODO: subscribeWithBase appends the protocol suffix, other methods don't. Make this consistent. + topic := t.fullTopic(subnet, "") + t.track(subnet, s.subscribeWithBase(topic, t.validate, t.handle)) } } -func (s *Service) logMinimumPeersPerSubnet(p subscribeParameters) { +func (s *Service) ensurePeers(ctx context.Context, tracker *subnetTracker) { + // Try once immediately so we don't have to wait until the next slot. + s.tryEnsurePeers(ctx, tracker) + + oncePerSlot := slots.NewSlotTicker(s.cfg.clock.GenesisTime(), params.BeaconConfig().SecondsPerSlot) + defer oncePerSlot.Done() + for { + select { + case <-oncePerSlot.C(): + s.tryEnsurePeers(ctx, tracker) + case <-ctx.Done(): + return + } + } +} + +func (s *Service) tryEnsurePeers(ctx context.Context, tracker *subnetTracker) { + timeout := (time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Second) - 100*time.Millisecond + minPeers := flags.Get().MinimumPeersPerSubnet + neededSubnets := computeAllNeededSubnets(s.cfg.clock.CurrentSlot(), tracker.getSubnetsToJoin, tracker.getSubnetsRequiringPeers) + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + err := s.cfg.p2p.FindAndDialPeersWithSubnets(ctx, tracker.topicFormat, tracker.nse.ForkDigest, minPeers, neededSubnets) + if err != nil && !errors.Is(err, context.DeadlineExceeded) { + log.WithFields(tracker.logFields()).WithError(err).Debug("Could not find peers with subnets") + } +} + +func (s *Service) logMinimumPeersPerSubnet(ctx context.Context, p subscribeParameters) { logFields := p.logFields() minimumPeersPerSubnet := flags.Get().MinimumPeersPerSubnet // Warn the user if we are not subscribed to enough peers in the subnets. @@ -603,7 +630,7 @@ func (s *Service) logMinimumPeersPerSubnet(p subscribeParameters) { isSubnetWithMissingPeers := false // Find new peers for wanted subnets if needed. for index := range subnetsToFindPeersIndex { - topic := fmt.Sprintf(p.topicFormat, p.digest, index) + topic := fmt.Sprintf(p.topicFormat, p.nse.ForkDigest, index) // Check if we have enough peers in the subnet. Skip if we do. if count := s.connectedPeersCount(topic); count < minimumPeersPerSubnet { @@ -617,7 +644,7 @@ func (s *Service) logMinimumPeersPerSubnet(p subscribeParameters) { if !isSubnetWithMissingPeers { log.WithFields(logFields).Debug("All subnets have enough connected peers") } - case <-s.ctx.Done(): + case <-ctx.Done(): return } } diff --git a/beacon-chain/sync/subscriber_test.go b/beacon-chain/sync/subscriber_test.go index 3f9f9c4c9f..fd6ac72e85 100644 --- a/beacon-chain/sync/subscriber_test.go +++ b/beacon-chain/sync/subscriber_test.go @@ -57,8 +57,9 @@ func TestSubscribe_ReceivesValidMessage(t *testing.T) { } markInitSyncComplete(t, &r) var err error - p2pService.Digest, err = r.currentForkDigest() require.NoError(t, err) + nse := params.GetNetworkScheduleEntry(r.cfg.clock.CurrentEpoch()) + p2pService.Digest = nse.ForkDigest topic := "/eth2/%x/voluntary_exit" var wg sync.WaitGroup wg.Add(1) @@ -71,7 +72,7 @@ func TestSubscribe_ReceivesValidMessage(t *testing.T) { } wg.Done() return nil - }, p2pService.Digest) + }, nse) r.markForChainStart() p2pService.ReceivePubSub(topic, &pb.SignedVoluntaryExit{Exit: &pb.VoluntaryExit{Epoch: 55}, Signature: make([]byte, fieldparams.BLSSignatureLength)}) @@ -105,14 +106,13 @@ func TestSubscribe_UnsubscribeTopic(t *testing.T) { subHandler: newSubTopicHandler(), } markInitSyncComplete(t, &r) - var err error - p2pService.Digest, err = r.currentForkDigest() - require.NoError(t, err) + nse := params.GetNetworkScheduleEntry(r.cfg.clock.CurrentEpoch()) + p2pService.Digest = nse.ForkDigest topic := "/eth2/%x/voluntary_exit" r.subscribe(topic, r.noopValidator, func(_ context.Context, msg proto.Message) error { return nil - }, p2pService.Digest) + }, nse) r.markForChainStart() fullTopic := fmt.Sprintf(topic, p2pService.Digest) + p2pService.Encoding().ProtocolSuffix() @@ -160,14 +160,13 @@ func TestSubscribe_ReceivesAttesterSlashing(t *testing.T) { topic := "/eth2/%x/attester_slashing" var wg sync.WaitGroup wg.Add(1) - var err error - p2pService.Digest, err = r.currentForkDigest() - require.NoError(t, err) + nse := params.GetNetworkScheduleEntry(r.cfg.clock.CurrentEpoch()) + p2pService.Digest = nse.ForkDigest r.subscribe(topic, r.noopValidator, func(ctx context.Context, msg proto.Message) error { require.NoError(t, r.attesterSlashingSubscriber(ctx, msg)) wg.Done() return nil - }, p2pService.Digest) + }, nse) beaconState, privKeys := util.DeterministicGenesisState(t, 64) chainService.State = beaconState r.markForChainStart() @@ -216,14 +215,13 @@ func TestSubscribe_ReceivesProposerSlashing(t *testing.T) { wg.Add(1) params.SetupTestConfigCleanup(t) params.OverrideBeaconConfig(params.MainnetConfig()) - var err error - p2pService.Digest, err = r.currentForkDigest() - require.NoError(t, err) + nse := params.GetNetworkScheduleEntry(r.cfg.clock.CurrentEpoch()) + p2pService.Digest = nse.ForkDigest r.subscribe(topic, r.noopValidator, func(ctx context.Context, msg proto.Message) error { require.NoError(t, r.proposerSlashingSubscriber(ctx, msg)) wg.Done() return nil - }, p2pService.Digest) + }, nse) beaconState, privKeys := util.DeterministicGenesisState(t, 64) chainService.State = beaconState r.markForChainStart() @@ -261,9 +259,8 @@ func TestSubscribe_HandlesPanic(t *testing.T) { } markInitSyncComplete(t, &r) - var err error - p.Digest, err = r.currentForkDigest() - require.NoError(t, err) + nse := params.GetNetworkScheduleEntry(r.cfg.clock.CurrentEpoch()) + p.Digest = nse.ForkDigest topic := p2p.GossipTypeMapping[reflect.TypeOf(&pb.SignedVoluntaryExit{})] var wg sync.WaitGroup @@ -272,7 +269,7 @@ func TestSubscribe_HandlesPanic(t *testing.T) { r.subscribe(topic, r.noopValidator, func(_ context.Context, msg proto.Message) error { defer wg.Done() panic("bad") - }, p.Digest) + }, nse) r.markForChainStart() p.ReceivePubSub(topic, &pb.SignedVoluntaryExit{Exit: &pb.VoluntaryExit{Epoch: 55}, Signature: make([]byte, fieldparams.BLSSignatureLength)}) @@ -298,12 +295,11 @@ func TestRevalidateSubscription_CorrectlyFormatsTopic(t *testing.T) { chainStarted: abool.New(), subHandler: newSubTopicHandler(), } - digest, err := r.currentForkDigest() - require.NoError(t, err) + nse := params.GetNetworkScheduleEntry(r.cfg.clock.CurrentEpoch()) params := subscribeParameters{ topicFormat: "/eth2/testing/%#x/committee%d", - digest: digest, + nse: nse, } tracker := newSubnetTracker(params) @@ -326,7 +322,7 @@ func TestRevalidateSubscription_CorrectlyFormatsTopic(t *testing.T) { require.NoError(t, err) tracker.track(c2, sub2) - r.pruneSubscriptions(tracker, map[uint64]bool{c2: true}) + r.pruneNotWanted(tracker, map[uint64]bool{c2: true}) require.LogsDoNotContain(t, hook, "Could not unregister topic validator") } @@ -483,6 +479,7 @@ func TestFilterSubnetPeers(t *testing.T) { chainStarted: abool.New(), subHandler: newSubTopicHandler(), } + markInitSyncComplete(t, &r) // Empty cache at the end of the test. defer cache.SubnetIDs.EmptyAllCaches() digest, err := r.currentForkDigest() @@ -548,16 +545,16 @@ func TestSubscribeWithSyncSubnets_DynamicOK(t *testing.T) { chainStarted: abool.New(), subHandler: newSubTopicHandler(), } + markInitSyncComplete(t, &r) // Empty cache at the end of the test. defer cache.SyncSubnetIDs.EmptyAllCaches() slot := r.cfg.clock.CurrentSlot() currEpoch := slots.ToEpoch(slot) cache.SyncSubnetIDs.AddSyncCommitteeSubnets([]byte("pubkey"), currEpoch, []uint64{0, 1}, 10*time.Second) - digest, err := r.currentForkDigest() - assert.NoError(t, err) + nse := params.GetNetworkScheduleEntry(r.cfg.clock.CurrentEpoch()) go r.subscribeWithParameters(subscribeParameters{ topicFormat: p2p.SyncCommitteeSubnetTopicFormat, - digest: digest, + nse: nse, getSubnetsToJoin: r.activeSyncSubnetIndices, }) time.Sleep(2 * time.Second) @@ -566,10 +563,10 @@ func TestSubscribeWithSyncSubnets_DynamicOK(t *testing.T) { for _, t := range r.cfg.p2p.PubSub().GetTopics() { topicMap[t] = true } - firstSub := fmt.Sprintf(p2p.SyncCommitteeSubnetTopicFormat, digest, 0) + r.cfg.p2p.Encoding().ProtocolSuffix() + firstSub := fmt.Sprintf(p2p.SyncCommitteeSubnetTopicFormat, nse.ForkDigest, 0) + r.cfg.p2p.Encoding().ProtocolSuffix() assert.Equal(t, true, topicMap[firstSub]) - secondSub := fmt.Sprintf(p2p.SyncCommitteeSubnetTopicFormat, digest, 1) + r.cfg.p2p.Encoding().ProtocolSuffix() + secondSub := fmt.Sprintf(p2p.SyncCommitteeSubnetTopicFormat, nse.ForkDigest, 1) + r.cfg.p2p.Encoding().ProtocolSuffix() assert.Equal(t, true, topicMap[secondSub]) cancel() } @@ -600,43 +597,39 @@ func TestSubscribeWithSyncSubnets_DynamicSwitchFork(t *testing.T) { // Empty cache at the end of the test. defer cache.SyncSubnetIDs.EmptyAllCaches() cache.SyncSubnetIDs.AddSyncCommitteeSubnets([]byte("pubkey"), 0, []uint64{0, 1}, 10*time.Second) - digest := params.ForkDigest(r.cfg.clock.CurrentEpoch()) - version, e, err := params.ForkDataFromDigest(digest) - require.NoError(t, err) - require.Equal(t, [4]byte(params.BeaconConfig().DenebForkVersion), version) - require.Equal(t, params.BeaconConfig().DenebForkEpoch, e) + nse := params.GetNetworkScheduleEntry(r.cfg.clock.CurrentEpoch()) + require.Equal(t, [4]byte(params.BeaconConfig().DenebForkVersion), nse.ForkVersion) + require.Equal(t, params.BeaconConfig().DenebForkEpoch, nse.Epoch) sp := newSubnetTracker(subscribeParameters{ topicFormat: p2p.SyncCommitteeSubnetTopicFormat, - digest: digest, + nse: nse, getSubnetsToJoin: r.activeSyncSubnetIndices, }) - require.NoError(t, r.subscribeToSubnets(sp)) + r.trySubscribeSubnets(sp) assert.Equal(t, 2, len(r.cfg.p2p.PubSub().GetTopics())) topicMap := map[string]bool{} for _, t := range r.cfg.p2p.PubSub().GetTopics() { topicMap[t] = true } - firstSub := fmt.Sprintf(p2p.SyncCommitteeSubnetTopicFormat, digest, 0) + r.cfg.p2p.Encoding().ProtocolSuffix() + firstSub := fmt.Sprintf(p2p.SyncCommitteeSubnetTopicFormat, nse.ForkDigest, 0) + r.cfg.p2p.Encoding().ProtocolSuffix() assert.Equal(t, true, topicMap[firstSub]) - secondSub := fmt.Sprintf(p2p.SyncCommitteeSubnetTopicFormat, digest, 1) + r.cfg.p2p.Encoding().ProtocolSuffix() + secondSub := fmt.Sprintf(p2p.SyncCommitteeSubnetTopicFormat, nse.ForkDigest, 1) + r.cfg.p2p.Encoding().ProtocolSuffix() assert.Equal(t, true, topicMap[secondSub]) electraSlot, err := slots.EpochStart(params.BeaconConfig().ElectraForkEpoch) require.NoError(t, err) mockNow.SetSlot(t, clock, electraSlot) - digest = params.ForkDigest(r.cfg.clock.CurrentEpoch()) - version, e, err = params.ForkDataFromDigest(digest) - require.NoError(t, err) - require.Equal(t, [4]byte(params.BeaconConfig().ElectraForkVersion), version) - require.Equal(t, params.BeaconConfig().ElectraForkEpoch, e) + nse = params.GetNetworkScheduleEntry(r.cfg.clock.CurrentEpoch()) + require.Equal(t, [4]byte(params.BeaconConfig().ElectraForkVersion), nse.ForkVersion) + require.Equal(t, params.BeaconConfig().ElectraForkEpoch, nse.Epoch) - sp.digest = digest + sp.nse = nse // clear the cache and re-subscribe to subnets. // this should result in the subscriptions being removed cache.SyncSubnetIDs.EmptyAllCaches() - require.NoError(t, r.subscribeToSubnets(sp)) + r.trySubscribeSubnets(sp) assert.Equal(t, 0, len(r.cfg.p2p.PubSub().GetTopics())) } diff --git a/changelog/kasey_idempotent-registration.md b/changelog/kasey_idempotent-registration.md new file mode 100644 index 0000000000..afd2085f95 --- /dev/null +++ b/changelog/kasey_idempotent-registration.md @@ -0,0 +1,2 @@ +### Fixed +- Fixed issue #15738 where separate goroutines assume sole responsibility for topic registration.