diff --git a/beacon-chain/node/node.go b/beacon-chain/node/node.go index ff0a27ef43..3fc7c942c1 100644 --- a/beacon-chain/node/node.go +++ b/beacon-chain/node/node.go @@ -277,7 +277,7 @@ func New(cliCtx *cli.Context, cancel context.CancelFunc, opts ...Option) (*Beaco } log.Debugln("Registering Sync Service") - if err := beacon.registerSyncService(beacon.initialSyncComplete); err != nil { + if err := beacon.registerSyncService(beacon.initialSyncComplete, bfs); err != nil { return nil, err } @@ -719,7 +719,7 @@ func (b *BeaconNode) registerPOWChainService() error { return b.services.RegisterService(web3Service) } -func (b *BeaconNode) registerSyncService(initialSyncComplete chan struct{}) error { +func (b *BeaconNode) registerSyncService(initialSyncComplete chan struct{}, bFillStore *backfill.Store) error { var web3Service *execution.Service if err := b.services.FetchService(&web3Service); err != nil { return err @@ -758,6 +758,7 @@ func (b *BeaconNode) registerSyncService(initialSyncComplete chan struct{}) erro regularsync.WithStateNotifier(b), regularsync.WithBlobStorage(b.BlobStorage), regularsync.WithVerifierWaiter(b.verifyInitWaiter), + regularsync.WithAvailableBlocker(bFillStore), ) return b.services.RegisterService(rs) } diff --git a/beacon-chain/p2p/types/rpc_errors.go b/beacon-chain/p2p/types/rpc_errors.go index 9fb7d6e6b4..6bed696b16 100644 --- a/beacon-chain/p2p/types/rpc_errors.go +++ b/beacon-chain/p2p/types/rpc_errors.go @@ -14,4 +14,5 @@ var ( ErrInvalidRequest = errors.New("invalid range, step or count") ErrBlobLTMinRequest = errors.New("blob slot < minimum_request_epoch") ErrMaxBlobReqExceeded = errors.New("requested more than MAX_REQUEST_BLOB_SIDECARS") + ErrResourceUnavailable = errors.New("resource requested unavailable") ) diff --git a/beacon-chain/sync/BUILD.bazel b/beacon-chain/sync/BUILD.bazel index 266ae26ae8..051aca07c4 100644 --- a/beacon-chain/sync/BUILD.bazel +++ b/beacon-chain/sync/BUILD.bazel @@ -90,6 +90,7 @@ go_library( "//beacon-chain/startup:go_default_library", "//beacon-chain/state:go_default_library", "//beacon-chain/state/stategen:go_default_library", + "//beacon-chain/sync/backfill/coverage:go_default_library", "//beacon-chain/sync/verify:go_default_library", "//beacon-chain/verification:go_default_library", "//cache/lru:go_default_library", diff --git a/beacon-chain/sync/options.go b/beacon-chain/sync/options.go index cba850c8a3..e605fd2c84 100644 --- a/beacon-chain/sync/options.go +++ b/beacon-chain/sync/options.go @@ -16,6 +16,7 @@ import ( "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p" "github.com/prysmaticlabs/prysm/v4/beacon-chain/startup" "github.com/prysmaticlabs/prysm/v4/beacon-chain/state/stategen" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/sync/backfill/coverage" "github.com/prysmaticlabs/prysm/v4/beacon-chain/verification" ) @@ -170,3 +171,12 @@ func WithVerifierWaiter(v *verification.InitializerWaiter) Option { return nil } } + +// WithAvailableBlocker allows the sync package to access the current +// status of backfill. +func WithAvailableBlocker(avb coverage.AvailableBlocker) Option { + return func(s *Service) error { + s.availableBlocker = avb + return nil + } +} diff --git a/beacon-chain/sync/rpc_beacon_blocks_by_range.go b/beacon-chain/sync/rpc_beacon_blocks_by_range.go index 3e7f703827..30cfddfbdf 100644 --- a/beacon-chain/sync/rpc_beacon_blocks_by_range.go +++ b/beacon-chain/sync/rpc_beacon_blocks_by_range.go @@ -38,6 +38,13 @@ func (s *Service) beaconBlocksByRangeRPCHandler(ctx context.Context, msg interfa tracing.AnnotateError(span, err) return err } + available := s.validateRangeAvailability(rp) + if !available { + log.Debug("error in validating range availability") + s.writeErrorResponseToStream(responseCodeResourceUnavailable, p2ptypes.ErrResourceUnavailable.Error(), stream) + tracing.AnnotateError(span, err) + return nil + } blockLimiter, err := s.rateLimiter.topicCollector(string(stream.Protocol())) if err != nil { @@ -126,6 +133,11 @@ func validateRangeRequest(r *pb.BeaconBlocksByRangeRequest, current primitives.S return rp, nil } +func (s *Service) validateRangeAvailability(rp rangeParams) bool { + startBlock := rp.start + return s.availableBlocker.AvailableBlock(startBlock) +} + func (s *Service) writeBlockBatchToStream(ctx context.Context, batch blockBatch, stream libp2pcore.Stream) error { ctx, span := trace.StartSpan(ctx, "sync.WriteBlockRangeToStream") defer span.End() diff --git a/beacon-chain/sync/rpc_beacon_blocks_by_range_test.go b/beacon-chain/sync/rpc_beacon_blocks_by_range_test.go index 0d44edc4c0..a71950baa4 100644 --- a/beacon-chain/sync/rpc_beacon_blocks_by_range_test.go +++ b/beacon-chain/sync/rpc_beacon_blocks_by_range_test.go @@ -64,7 +64,7 @@ func TestRPCBeaconBlocksByRange_RPCHandlerReturnsBlocks(t *testing.T) { clock := startup.NewClock(time.Unix(0, 0), [32]byte{}) // Start service with 160 as allowed blocks capacity (and almost zero capacity recovery). - r := &Service{cfg: &config{p2p: p1, beaconDB: d, clock: clock, chain: &chainMock.ChainService{}}, rateLimiter: newRateLimiter(p1)} + r := &Service{cfg: &config{p2p: p1, beaconDB: d, clock: clock, chain: &chainMock.ChainService{}}, availableBlocker: mockBlocker{avail: true}, rateLimiter: newRateLimiter(p1)} pcl := protocol.ID(p2p.RPCBlocksByRangeTopicV1) topic := string(pcl) r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(0.000001, int64(req.Count*10), time.Second, false) @@ -127,7 +127,7 @@ func TestRPCBeaconBlocksByRange_ReturnCorrectNumberBack(t *testing.T) { clock := startup.NewClock(time.Unix(0, 0), [32]byte{}) // Start service with 160 as allowed blocks capacity (and almost zero capacity recovery). - r := &Service{cfg: &config{p2p: p1, beaconDB: d, chain: &chainMock.ChainService{}, clock: clock}, rateLimiter: newRateLimiter(p1)} + r := &Service{cfg: &config{p2p: p1, beaconDB: d, chain: &chainMock.ChainService{}, clock: clock}, availableBlocker: mockBlocker{avail: true}, rateLimiter: newRateLimiter(p1)} pcl := protocol.ID(p2p.RPCBlocksByRangeTopicV1) topic := string(pcl) r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(0.000001, int64(req.Count*10), time.Second, false) @@ -244,7 +244,8 @@ func TestRPCBeaconBlocksByRange_ReconstructsPayloads(t *testing.T) { clock: clock, executionPayloadReconstructor: mockEngine, }, - rateLimiter: newRateLimiter(p1), + rateLimiter: newRateLimiter(p1), + availableBlocker: mockBlocker{avail: true}, } pcl := protocol.ID(p2p.RPCBlocksByRangeTopicV1) topic := string(pcl) @@ -314,7 +315,7 @@ func TestRPCBeaconBlocksByRange_RPCHandlerReturnsSortedBlocks(t *testing.T) { clock := startup.NewClock(time.Unix(0, 0), [32]byte{}) // Start service with 160 as allowed blocks capacity (and almost zero capacity recovery). - r := &Service{cfg: &config{p2p: p1, beaconDB: d, clock: clock, chain: &chainMock.ChainService{}}, rateLimiter: newRateLimiter(p1)} + r := &Service{cfg: &config{p2p: p1, beaconDB: d, clock: clock, chain: &chainMock.ChainService{}}, availableBlocker: mockBlocker{avail: true}, rateLimiter: newRateLimiter(p1)} pcl := protocol.ID(p2p.RPCBlocksByRangeTopicV1) topic := string(pcl) r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(0.000001, int64(req.Count*10), time.Second, false) @@ -380,7 +381,7 @@ func TestRPCBeaconBlocksByRange_ReturnsGenesisBlock(t *testing.T) { } clock := startup.NewClock(time.Unix(0, 0), [32]byte{}) - r := &Service{cfg: &config{p2p: p1, beaconDB: d, clock: clock, chain: &chainMock.ChainService{}}, rateLimiter: newRateLimiter(p1)} + r := &Service{cfg: &config{p2p: p1, beaconDB: d, clock: clock, chain: &chainMock.ChainService{}}, availableBlocker: mockBlocker{avail: true}, rateLimiter: newRateLimiter(p1)} pcl := protocol.ID(p2p.RPCBlocksByRangeTopicV1) topic := string(pcl) r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(10000, 10000, time.Second, false) @@ -472,7 +473,7 @@ func TestRPCBeaconBlocksByRange_RPCHandlerRateLimitOverflow(t *testing.T) { capacity := int64(flags.Get().BlockBatchLimit * 3) clock := startup.NewClock(time.Unix(0, 0), [32]byte{}) - r := &Service{cfg: &config{p2p: p1, beaconDB: d, chain: &chainMock.ChainService{}, clock: clock}, rateLimiter: newRateLimiter(p1)} + r := &Service{cfg: &config{p2p: p1, beaconDB: d, chain: &chainMock.ChainService{}, clock: clock}, availableBlocker: mockBlocker{avail: true}, rateLimiter: newRateLimiter(p1)} pcl := protocol.ID(p2p.RPCBlocksByRangeTopicV1) topic := string(pcl) @@ -499,7 +500,7 @@ func TestRPCBeaconBlocksByRange_RPCHandlerRateLimitOverflow(t *testing.T) { capacity := int64(flags.Get().BlockBatchLimit * 3) clock := startup.NewClock(time.Unix(0, 0), [32]byte{}) - r := &Service{cfg: &config{p2p: p1, beaconDB: d, clock: clock, chain: &chainMock.ChainService{}}, rateLimiter: newRateLimiter(p1)} + r := &Service{cfg: &config{p2p: p1, beaconDB: d, clock: clock, chain: &chainMock.ChainService{}}, availableBlocker: mockBlocker{avail: true}, rateLimiter: newRateLimiter(p1)} pcl := protocol.ID(p2p.RPCBlocksByRangeTopicV1) topic := string(pcl) @@ -530,7 +531,7 @@ func TestRPCBeaconBlocksByRange_RPCHandlerRateLimitOverflow(t *testing.T) { capacity := int64(flags.Get().BlockBatchLimit * flags.Get().BlockBatchLimitBurstFactor) clock := startup.NewClock(time.Unix(0, 0), [32]byte{}) - r := &Service{cfg: &config{p2p: p1, beaconDB: d, clock: clock, chain: &chainMock.ChainService{}}, rateLimiter: newRateLimiter(p1)} + r := &Service{cfg: &config{p2p: p1, beaconDB: d, clock: clock, chain: &chainMock.ChainService{}}, availableBlocker: mockBlocker{avail: true}, rateLimiter: newRateLimiter(p1)} pcl := protocol.ID(p2p.RPCBlocksByRangeTopicV1) topic := string(pcl) r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(0.000001, capacity, time.Second, false) @@ -722,7 +723,7 @@ func TestRPCBeaconBlocksByRange_EnforceResponseInvariants(t *testing.T) { assert.Equal(t, 1, len(p1.BHost.Network().Peers()), "Expected peers to be connected") clock := startup.NewClock(time.Unix(0, 0), [32]byte{}) - r := &Service{cfg: &config{p2p: p1, beaconDB: d, chain: &chainMock.ChainService{}, clock: clock}, rateLimiter: newRateLimiter(p1)} + r := &Service{cfg: &config{p2p: p1, beaconDB: d, chain: &chainMock.ChainService{}, clock: clock}, availableBlocker: mockBlocker{avail: true}, rateLimiter: newRateLimiter(p1)} r.rateLimiter.limiterMap[string(pcl)] = leakybucket.NewCollector(0.000001, 640, time.Second, false) req := ðpb.BeaconBlocksByRangeRequest{ StartSlot: 448, @@ -891,7 +892,7 @@ func TestRPCBeaconBlocksByRange_FilterBlocks(t *testing.T) { assert.Equal(t, 1, len(p1.BHost.Network().Peers()), "Expected peers to be connected") clock := startup.NewClock(time.Unix(0, 0), [32]byte{}) - r := &Service{cfg: &config{p2p: p1, beaconDB: d, clock: clock, chain: &chainMock.ChainService{}}, rateLimiter: newRateLimiter(p1)} + r := &Service{cfg: &config{p2p: p1, beaconDB: d, clock: clock, chain: &chainMock.ChainService{}}, availableBlocker: mockBlocker{avail: true}, rateLimiter: newRateLimiter(p1)} r.rateLimiter.limiterMap[string(pcl)] = leakybucket.NewCollector(0.000001, 640, time.Second, false) req := ðpb.BeaconBlocksByRangeRequest{ StartSlot: 1, @@ -923,7 +924,7 @@ func TestRPCBeaconBlocksByRange_FilterBlocks(t *testing.T) { assert.Equal(t, 1, len(p1.BHost.Network().Peers()), "Expected peers to be connected") clock := startup.NewClock(time.Unix(0, 0), [32]byte{}) - r := &Service{cfg: &config{p2p: p1, beaconDB: d, clock: clock, chain: &chainMock.ChainService{}}, rateLimiter: newRateLimiter(p1)} + r := &Service{cfg: &config{p2p: p1, beaconDB: d, clock: clock, chain: &chainMock.ChainService{}}, availableBlocker: mockBlocker{avail: true}, rateLimiter: newRateLimiter(p1)} r.rateLimiter.limiterMap[string(pcl)] = leakybucket.NewCollector(0.000001, 640, time.Second, false) req := ðpb.BeaconBlocksByRangeRequest{ StartSlot: 1, @@ -958,7 +959,7 @@ func TestRPCBeaconBlocksByRange_FilterBlocks(t *testing.T) { p1.Connect(p2) assert.Equal(t, 1, len(p1.BHost.Network().Peers()), "Expected peers to be connected") clock := startup.NewClock(time.Unix(0, 0), [32]byte{}) - r := &Service{cfg: &config{p2p: p1, beaconDB: d, chain: &chainMock.ChainService{}, clock: clock}, rateLimiter: newRateLimiter(p1)} + r := &Service{cfg: &config{p2p: p1, beaconDB: d, chain: &chainMock.ChainService{}, clock: clock}, availableBlocker: mockBlocker{avail: true}, rateLimiter: newRateLimiter(p1)} r.rateLimiter.limiterMap[string(pcl)] = leakybucket.NewCollector(0.000001, 640, time.Second, false) req := ðpb.BeaconBlocksByRangeRequest{ StartSlot: 1, @@ -994,7 +995,7 @@ func TestRPCBeaconBlocksByRange_FilterBlocks(t *testing.T) { assert.Equal(t, 1, len(p1.BHost.Network().Peers()), "Expected peers to be connected") clock := startup.NewClock(time.Unix(0, 0), [32]byte{}) - r := &Service{cfg: &config{p2p: p1, beaconDB: d, chain: &chainMock.ChainService{}, clock: clock}, rateLimiter: newRateLimiter(p1)} + r := &Service{cfg: &config{p2p: p1, beaconDB: d, chain: &chainMock.ChainService{}, clock: clock}, availableBlocker: mockBlocker{avail: true}, rateLimiter: newRateLimiter(p1)} r.rateLimiter.limiterMap[string(pcl)] = leakybucket.NewCollector(0.000001, 640, time.Second, false) req := ðpb.BeaconBlocksByRangeRequest{ StartSlot: 1, @@ -1035,7 +1036,7 @@ func TestRPCBeaconBlocksByRange_FilterBlocks(t *testing.T) { assert.Equal(t, 1, len(p1.BHost.Network().Peers()), "Expected peers to be connected") clock := startup.NewClock(time.Unix(0, 0), [32]byte{}) - r := &Service{cfg: &config{p2p: p1, beaconDB: d, chain: &chainMock.ChainService{}, clock: clock}, rateLimiter: newRateLimiter(p1)} + r := &Service{cfg: &config{p2p: p1, beaconDB: d, chain: &chainMock.ChainService{}, clock: clock}, availableBlocker: mockBlocker{avail: true}, rateLimiter: newRateLimiter(p1)} r.rateLimiter.limiterMap[string(pcl)] = leakybucket.NewCollector(0.000001, 640, time.Second, false) req := ðpb.BeaconBlocksByRangeRequest{ StartSlot: 1, @@ -1105,3 +1106,11 @@ func TestRPCBeaconBlocksByRange_FilterBlocks_PreviousRoot(t *testing.T) { // pointer should reference a new root. require.NotEqual(t, cf.prevRoot, [32]byte{}) } + +type mockBlocker struct { + avail bool +} + +func (m mockBlocker) AvailableBlock(_ primitives.Slot) bool { + return m.avail +} diff --git a/beacon-chain/sync/service.go b/beacon-chain/sync/service.go index bc66448981..2cb280a92f 100644 --- a/beacon-chain/sync/service.go +++ b/beacon-chain/sync/service.go @@ -33,6 +33,7 @@ import ( "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p" "github.com/prysmaticlabs/prysm/v4/beacon-chain/startup" "github.com/prysmaticlabs/prysm/v4/beacon-chain/state/stategen" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/sync/backfill/coverage" "github.com/prysmaticlabs/prysm/v4/beacon-chain/verification" lruwrpr "github.com/prysmaticlabs/prysm/v4/cache/lru" "github.com/prysmaticlabs/prysm/v4/config/params" @@ -155,6 +156,7 @@ type Service struct { initialSyncComplete chan struct{} verifierWaiter *verification.InitializerWaiter newBlobVerifier verification.NewBlobVerifier + availableBlocker coverage.AvailableBlocker } // NewService initializes new regular sync service.