// Package sync includes all chain-synchronization logic for the beacon node, // including gossip-sub validators for blocks, attestations, and other p2p // messages, as well as ability to process and respond to block requests // by peers. package sync import ( "context" "sync" "time" "github.com/OffchainLabs/prysm/v7/async" "github.com/OffchainLabs/prysm/v7/async/abool" "github.com/OffchainLabs/prysm/v7/async/event" "github.com/OffchainLabs/prysm/v7/beacon-chain/blockchain" "github.com/OffchainLabs/prysm/v7/beacon-chain/cache" blockfeed "github.com/OffchainLabs/prysm/v7/beacon-chain/core/feed/block" "github.com/OffchainLabs/prysm/v7/beacon-chain/core/feed/operation" statefeed "github.com/OffchainLabs/prysm/v7/beacon-chain/core/feed/state" "github.com/OffchainLabs/prysm/v7/beacon-chain/db" "github.com/OffchainLabs/prysm/v7/beacon-chain/db/filesystem" "github.com/OffchainLabs/prysm/v7/beacon-chain/execution" lightClient "github.com/OffchainLabs/prysm/v7/beacon-chain/light-client" "github.com/OffchainLabs/prysm/v7/beacon-chain/operations/attestations" "github.com/OffchainLabs/prysm/v7/beacon-chain/operations/blstoexec" "github.com/OffchainLabs/prysm/v7/beacon-chain/operations/slashings" "github.com/OffchainLabs/prysm/v7/beacon-chain/operations/synccommittee" "github.com/OffchainLabs/prysm/v7/beacon-chain/operations/voluntaryexits" "github.com/OffchainLabs/prysm/v7/beacon-chain/p2p" p2ptypes "github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/types" "github.com/OffchainLabs/prysm/v7/beacon-chain/startup" "github.com/OffchainLabs/prysm/v7/beacon-chain/state/stategen" "github.com/OffchainLabs/prysm/v7/beacon-chain/sync/backfill/coverage" "github.com/OffchainLabs/prysm/v7/beacon-chain/verification" lruwrpr "github.com/OffchainLabs/prysm/v7/cache/lru" "github.com/OffchainLabs/prysm/v7/config/params" "github.com/OffchainLabs/prysm/v7/consensus-types/blocks" "github.com/OffchainLabs/prysm/v7/consensus-types/interfaces" leakybucket "github.com/OffchainLabs/prysm/v7/container/leaky-bucket" "github.com/OffchainLabs/prysm/v7/crypto/rand" "github.com/OffchainLabs/prysm/v7/runtime" prysmTime "github.com/OffchainLabs/prysm/v7/time" "github.com/OffchainLabs/prysm/v7/time/slots" lru "github.com/hashicorp/golang-lru" pubsub "github.com/libp2p/go-libp2p-pubsub" libp2pcore "github.com/libp2p/go-libp2p/core" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" gcache "github.com/patrickmn/go-cache" "github.com/pkg/errors" "github.com/sirupsen/logrus" "github.com/trailofbits/go-mutexasserts" "golang.org/x/sync/singleflight" ) var _ runtime.Service = (*Service)(nil) const ( rangeLimit uint64 = 1024 seenBlockSize = 1000 seenDataColumnSize = seenBlockSize * 128 // Each block can have max 128 data columns. seenUnaggregatedAttSize = 20000 seenAggregatedAttSize = 16384 seenSyncMsgSize = 1000 // Maximum of 512 sync committee members, 1000 is a safe amount. seenSyncContributionSize = 512 // Maximum of SYNC_COMMITTEE_SIZE as specified by the spec. seenExitSize = 100 seenProposerSlashingSize = 100 badBlockSize = 1000 syncMetricsInterval = 10 * time.Second ) var ( // Seconds in one epoch. pendingBlockExpTime = time.Duration(params.BeaconConfig().SlotsPerEpoch.Mul(params.BeaconConfig().SecondsPerSlot)) * time.Second // time to allow processing early blocks. earlyBlockProcessingTolerance = slots.MultiplySlotBy(2) // time to allow processing early attestations. earlyAttestationProcessingTolerance = params.BeaconConfig().MaximumGossipClockDisparityDuration() errWrongMessage = errors.New("wrong pubsub message") errNilMessage = errors.New("nil pubsub message") ) // Common type for functional p2p validation options. type validationFn func(ctx context.Context) (pubsub.ValidationResult, error) // config to hold dependencies for the sync service. type config struct { attestationNotifier operation.Notifier p2p p2p.P2P beaconDB db.NoHeadAccessDatabase attestationCache *cache.AttestationCache attPool attestations.Pool exitPool voluntaryexits.PoolManager slashingPool slashings.PoolManager syncCommsPool synccommittee.Pool blsToExecPool blstoexec.PoolManager chain blockchainService initialSync Checker blockNotifier blockfeed.Notifier operationNotifier operation.Notifier executionReconstructor execution.Reconstructor stateGen *stategen.State slasherAttestationsFeed *event.Feed slasherBlockHeadersFeed *event.Feed clock *startup.Clock stateNotifier statefeed.Notifier blobStorage *filesystem.BlobStorage dataColumnStorage *filesystem.DataColumnStorage batchVerifierLimit int } // This defines the interface for interacting with block chain service type blockchainService interface { blockchain.BlockReceiver blockchain.BlobReceiver blockchain.DataColumnReceiver blockchain.HeadFetcher blockchain.FinalizationFetcher blockchain.ForkFetcher blockchain.AttestationReceiver blockchain.TimeFetcher blockchain.GenesisFetcher blockchain.CanonicalFetcher blockchain.OptimisticModeFetcher blockchain.SlashingReceiver blockchain.ForkchoiceFetcher } // Service is responsible for handling all run time p2p related operations as the // main entry point for network messages. type Service struct { cfg *config ctx context.Context cancel context.CancelFunc slotToPendingBlocks *gcache.Cache seenPendingBlocks map[[32]byte]bool blkRootToPendingAtts map[[32]byte][]any subHandler *subTopicHandler pendingAttsLock sync.RWMutex pendingQueueLock sync.RWMutex chainStarted *abool.AtomicBool validateBlockLock sync.RWMutex rateLimiter *limiter seenBlockLock sync.RWMutex seenBlockCache *lru.Cache seenBlobLock sync.RWMutex seenBlobCache *lru.Cache seenDataColumnCache *slotAwareCache seenAggregatedAttestationLock sync.RWMutex seenAggregatedAttestationCache *lru.Cache seenUnAggregatedAttestationLock sync.RWMutex seenUnAggregatedAttestationCache *lru.Cache seenExitLock sync.RWMutex seenExitCache *lru.Cache seenProposerSlashingLock sync.RWMutex seenProposerSlashingCache *lru.Cache seenAttesterSlashingLock sync.RWMutex seenAttesterSlashingCache map[uint64]bool seenSyncMessageLock sync.RWMutex seenSyncMessageCache *lru.Cache seenSyncContributionLock sync.RWMutex seenSyncContributionCache *lru.Cache badBlockCache *lru.Cache badBlockLock sync.RWMutex syncContributionBitsOverlapLock sync.RWMutex syncContributionBitsOverlapCache *lru.Cache signatureChan chan *signatureVerifier kzgChan chan *kzgVerifier clockWaiter startup.ClockWaiter initialSyncComplete chan struct{} verifierWaiter *verification.InitializerWaiter newBlobVerifier verification.NewBlobVerifier newColumnsVerifier verification.NewDataColumnsVerifier columnSidecarsExecSingleFlight singleflight.Group reconstructionSingleFlight singleflight.Group availableBlocker coverage.AvailableBlocker reconstructionRandGen *rand.Rand trackedValidatorsCache *cache.TrackedValidatorsCache ctxMap ContextByteVersions slasherEnabled bool lcStore *lightClient.Store dataColumnLogCh chan dataColumnLogEntry digestActions perDigestSet subscriptionSpawner func(func()) // see Service.spawn for details } // NewService initializes new regular sync service. func NewService(ctx context.Context, opts ...Option) *Service { ctx, cancel := context.WithCancel(ctx) r := &Service{ ctx: ctx, cancel: cancel, chainStarted: abool.New(), cfg: &config{clock: startup.NewClock(time.Unix(0, 0), [32]byte{})}, slotToPendingBlocks: gcache.New(pendingBlockExpTime /* exp time */, 0 /* disable janitor */), seenPendingBlocks: make(map[[32]byte]bool), blkRootToPendingAtts: make(map[[32]byte][]any), dataColumnLogCh: make(chan dataColumnLogEntry, 1000), reconstructionRandGen: rand.NewGenerator(), } for _, opt := range opts { if err := opt(r); err != nil { return nil } } // Initialize signature channel with configured limit r.signatureChan = make(chan *signatureVerifier, r.cfg.batchVerifierLimit) // Initialize KZG channel with fixed buffer size of 100. // This buffer size is designed to handle burst traffic of data column gossip messages: // - Data columns arrive less frequently than attestations (default batchVerifierLimit=1000) r.kzgChan = make(chan *kzgVerifier, 100) // Correctly remove it from our seen pending block map. // The eviction method always assumes that the mutex is held. r.slotToPendingBlocks.OnEvicted(func(s string, i any) { if !mutexasserts.RWMutexLocked(&r.pendingQueueLock) { log.Errorf("Mutex is not locked during cache eviction of values") // Continue on to allow elements to be properly removed. } blks, ok := i.([]interfaces.ReadOnlySignedBeaconBlock) if !ok { log.Errorf("Invalid type retrieved from the cache: %T", i) return } for _, b := range blks { root, err := b.Block().HashTreeRoot() if err != nil { log.WithError(err).Error("Could not calculate htr of block") continue } delete(r.seenPendingBlocks, root) } }) r.subHandler = newSubTopicHandler() r.rateLimiter = newRateLimiter(r.cfg.p2p) r.initCaches() return r } func newBlobVerifierFromInitializer(ini *verification.Initializer) verification.NewBlobVerifier { return func(b blocks.ROBlob, reqs []verification.Requirement) verification.BlobVerifier { return ini.NewBlobVerifier(b, reqs) } } func newDataColumnsVerifierFromInitializer(ini *verification.Initializer) verification.NewDataColumnsVerifier { return func(roDataColumns []blocks.RODataColumn, reqs []verification.Requirement) verification.DataColumnsVerifier { return ini.NewDataColumnsVerifier(roDataColumns, reqs) } } // Start the regular sync service. func (s *Service) Start() { v, err := s.verifierWaiter.WaitForInitializer(s.ctx) if err != nil { log.WithError(err).Error("Could not get verification initializer") return } s.newBlobVerifier = newBlobVerifierFromInitializer(v) s.newColumnsVerifier = newDataColumnsVerifierFromInitializer(v) go s.verifierRoutine() go s.kzgVerifierRoutine() go s.startDiscoveryAndSubscriptions() go s.processDataColumnLogs() s.cfg.p2p.AddConnectionHandler(s.reValidatePeer, s.sendGoodbye) s.cfg.p2p.AddDisconnectionHandler(func(_ context.Context, _ peer.ID) error { // no-op return nil }) s.cfg.p2p.AddPingMethod(s.sendPingRequest) s.processPendingBlocksQueue() s.maintainPeerStatuses() if params.FuluEnabled() { s.maintainCustodyInfo() } s.resyncIfBehind() // Update sync metrics. async.RunEvery(s.ctx, syncMetricsInterval, s.updateMetrics) // Prune data column cache periodically on finalization. async.RunEvery(s.ctx, 30*time.Second, s.pruneDataColumnCache) } // Stop the regular sync service. func (s *Service) Stop() error { defer func() { s.cancel() if s.rateLimiter != nil { s.rateLimiter.free() } }() // Create context with timeout to prevent hanging goodbyeCtx, cancel := context.WithTimeout(s.ctx, 10*time.Second) defer cancel() // Use WaitGroup to ensure all goodbye messages complete var wg sync.WaitGroup for _, peerID := range s.cfg.p2p.Peers().Connected() { if s.cfg.p2p.Host().Network().Connectedness(peerID) == network.Connected { wg.Add(1) go func(pid peer.ID) { defer wg.Done() if err := s.sendGoodByeAndDisconnect(goodbyeCtx, p2ptypes.GoodbyeCodeClientShutdown, pid); err != nil { log.WithError(err).WithField("peerID", pid).Error("Failed to send goodbye message") } }(peerID) } } wg.Wait() log.Debug("All goodbye messages sent successfully") // Now safe to remove handlers / unsubscribe. for _, p := range s.cfg.p2p.Host().Mux().Protocols() { s.cfg.p2p.Host().RemoveStreamHandler(p) } for _, t := range s.cfg.p2p.PubSub().GetTopics() { s.unSubscribeFromTopic(t) } return nil } // Status of the currently running regular sync service. func (s *Service) Status() error { // If our head slot is on a previous epoch and our peers are reporting their head block are // in the most recent epoch, then we might be out of sync. if headEpoch := slots.ToEpoch(s.cfg.chain.HeadSlot()); headEpoch+1 < slots.ToEpoch(s.cfg.clock.CurrentSlot()) && headEpoch+1 < s.cfg.p2p.Peers().HighestEpoch() { return errors.New("out of sync") } return nil } // This initializes the caches to update seen beacon objects coming in from the wire // and prevent DoS. func (s *Service) initCaches() { s.seenBlockCache = lruwrpr.New(seenBlockSize) s.seenBlobCache = lruwrpr.New(seenBlockSize * params.BeaconConfig().DeprecatedMaxBlobsPerBlockElectra) s.seenDataColumnCache = newSlotAwareCache(seenDataColumnSize) s.seenAggregatedAttestationCache = lruwrpr.New(seenAggregatedAttSize) s.seenUnAggregatedAttestationCache = lruwrpr.New(seenUnaggregatedAttSize) s.seenSyncMessageCache = lruwrpr.New(seenSyncMsgSize) s.seenSyncContributionCache = lruwrpr.New(seenSyncContributionSize) s.syncContributionBitsOverlapCache = lruwrpr.New(seenSyncContributionSize) s.seenExitCache = lruwrpr.New(seenExitSize) s.seenAttesterSlashingCache = make(map[uint64]bool) s.seenProposerSlashingCache = lruwrpr.New(seenProposerSlashingSize) s.badBlockCache = lruwrpr.New(badBlockSize) } func (s *Service) waitForChainStart() { clock, err := s.clockWaiter.WaitForClock(s.ctx) if err != nil { log.WithError(err).Error("Sync service failed to receive genesis data") return } s.cfg.clock = clock startTime := clock.GenesisTime() log.WithField("startTime", startTime).Debug("Received state initialized event") ctxMap, err := ContextByteVersionsForValRoot(clock.GenesisValidatorsRoot()) if err != nil { log. WithError(err). WithField("genesisValidatorRoot", clock.GenesisValidatorsRoot()). Error("Sync service failed to initialize context version map") return } s.ctxMap = ctxMap // We need to register RPC handlers ASAP so that we can handle incoming status message // requests from peers. nse := params.GetNetworkScheduleEntry(clock.CurrentEpoch()) if err := s.registerRPCHandlers(nse); err != nil { // If we fail here, we won't be able to peer with anyone because we can't handle their status messages. log.WithError(err).Error("Failed to register RPC handlers") // TODO: need ability to bubble the error up to the top of the node init tree and exit safely. return } // Wait for chainstart in separate routine. if startTime.After(prysmTime.Now()) { time.Sleep(prysmTime.Until(startTime)) } log.WithField("startTime", startTime).Debug("Chain started in sync service") s.markForChainStart() } func (s *Service) startDiscoveryAndSubscriptions() { // Wait for the chain to start. s.waitForChainStart() if s.ctx.Err() != nil { log.Debug("Context closed, exiting StartDiscoveryAndSubscription") return } // Start the fork watcher. go s.p2pHandlerControlLoop() } func (s *Service) writeErrorResponseToStream(responseCode byte, reason string, stream libp2pcore.Stream) { writeErrorResponseToStream(responseCode, reason, stream, s.cfg.p2p) } func (s *Service) setRateCollector(topic string, c *leakybucket.Collector) { s.rateLimiter.limiterMap[topic] = c } // marks the chain as having started. func (s *Service) markForChainStart() { s.chainStarted.Set() } // pruneDataColumnCache removes entries from the data column cache that are older than the finalized slot. func (s *Service) pruneDataColumnCache() { finalizedCheckpoint := s.cfg.chain.FinalizedCheckpt() finalizedSlot, err := slots.EpochStart(finalizedCheckpoint.Epoch) if err != nil { log.WithError(err).Error("Could not calculate finalized slot for cache pruning") return } pruned := s.seenDataColumnCache.pruneSlotsBefore(finalizedSlot) if pruned > 0 { log.WithFields(logrus.Fields{ "finalizedSlot": finalizedSlot, "prunedEntries": pruned, }).Debug("Pruned data column cache entries before finalized slot") } } func (s *Service) chainIsStarted() bool { return s.chainStarted.IsSet() } func (s *Service) waitForInitialSync(ctx context.Context) error { select { case <-s.initialSyncComplete: return nil case <-ctx.Done(): return ctx.Err() } } // Checker defines a struct which can verify whether a node is currently // synchronizing a chain with the rest of peers in the network. type Checker interface { Initialized() bool Syncing() bool Synced() bool Status() error Resync() error }