From a9d144ad1fe4edd06d10bd54d3f30d10fd235580 Mon Sep 17 00:00:00 2001 From: Raul Jordan Date: Thu, 6 Feb 2020 14:14:38 -0600 Subject: [PATCH] Stream Blocks Functionality for RPC (#4771) * stream blocks functionality included * necessary tests for stream blocks and notifier * gazelle and tests passing * gazelle and tests passing * Merge branch 'master' into stream-block * Update beacon-chain/core/feed/block/events.go * Merge refs/heads/master into stream-block * Merge refs/heads/master into stream-block * Merge refs/heads/master into stream-block * Merge refs/heads/master into stream-block * naming * build * Merge refs/heads/master into stream-block * Merge refs/heads/master into stream-block * Merge refs/heads/master into stream-block * fix up tests * Merge branch 'stream-block' of github.com:prysmaticlabs/prysm into stream-block * Merge refs/heads/master into stream-block * shay comment * Merge refs/heads/master into stream-block * Merge branch 'stream-block' of github.com:prysmaticlabs/prysm into stream-block * Merge refs/heads/master into stream-block --- beacon-chain/blockchain/testing/BUILD.bazel | 1 + beacon-chain/blockchain/testing/mock.go | 23 ++++++ beacon-chain/core/feed/block/BUILD.bazel | 15 ++++ beacon-chain/core/feed/block/events.go | 15 ++++ beacon-chain/core/feed/block/notifier.go | 8 +++ beacon-chain/node/node.go | 11 ++- beacon-chain/rpc/BUILD.bazel | 1 + beacon-chain/rpc/beacon/BUILD.bazel | 2 + beacon-chain/rpc/beacon/blocks.go | 33 ++++++++- beacon-chain/rpc/beacon/blocks_test.go | 70 +++++++++++++++++++ beacon-chain/rpc/beacon/server.go | 2 + beacon-chain/rpc/service.go | 6 ++ beacon-chain/rpc/validator/BUILD.bazel | 1 + beacon-chain/rpc/validator/proposer.go | 6 ++ beacon-chain/rpc/validator/proposer_test.go | 6 +- beacon-chain/rpc/validator/server.go | 2 + beacon-chain/sync/BUILD.bazel | 1 + beacon-chain/sync/initial-sync/BUILD.bazel | 1 + beacon-chain/sync/initial-sync/round_robin.go | 6 ++ .../sync/initial-sync/round_robin_test.go | 1 + beacon-chain/sync/initial-sync/service.go | 4 ++ beacon-chain/sync/service.go | 4 ++ beacon-chain/sync/subscriber_beacon_blocks.go | 11 +++ .../sync/subscriber_beacon_blocks_test.go | 8 ++- 24 files changed, 231 insertions(+), 7 deletions(-) create mode 100644 beacon-chain/core/feed/block/BUILD.bazel create mode 100644 beacon-chain/core/feed/block/events.go create mode 100644 beacon-chain/core/feed/block/notifier.go diff --git a/beacon-chain/blockchain/testing/BUILD.bazel b/beacon-chain/blockchain/testing/BUILD.bazel index f1248b1350..d6fb2f1072 100644 --- a/beacon-chain/blockchain/testing/BUILD.bazel +++ b/beacon-chain/blockchain/testing/BUILD.bazel @@ -8,6 +8,7 @@ go_library( visibility = ["//beacon-chain:__subpackages__"], deps = [ "//beacon-chain/core/epoch/precompute:go_default_library", + "//beacon-chain/core/feed/block:go_default_library", "//beacon-chain/core/feed/operation:go_default_library", "//beacon-chain/core/feed/state:go_default_library", "//beacon-chain/core/helpers:go_default_library", diff --git a/beacon-chain/blockchain/testing/mock.go b/beacon-chain/blockchain/testing/mock.go index 289486db6c..4b5b86f77b 100644 --- a/beacon-chain/blockchain/testing/mock.go +++ b/beacon-chain/blockchain/testing/mock.go @@ -9,6 +9,7 @@ import ( ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" "github.com/prysmaticlabs/go-ssz" "github.com/prysmaticlabs/prysm/beacon-chain/core/epoch/precompute" + blockfeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/block" opfeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/operation" statefeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/state" "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" @@ -34,6 +35,7 @@ type ChainService struct { Fork *pb.Fork DB db.Database stateNotifier statefeed.Notifier + blockNotifier blockfeed.Notifier opNotifier opfeed.Notifier } @@ -45,6 +47,27 @@ func (ms *ChainService) StateNotifier() statefeed.Notifier { return ms.stateNotifier } +// BlockNotifier mocks the same method in the chain service. +func (ms *ChainService) BlockNotifier() blockfeed.Notifier { + if ms.blockNotifier == nil { + ms.blockNotifier = &MockBlockNotifier{} + } + return ms.blockNotifier +} + +// MockBlockNotifier mocks the block notifier. +type MockBlockNotifier struct { + feed *event.Feed +} + +// BlockFeed returns a block feed. +func (msn *MockBlockNotifier) BlockFeed() *event.Feed { + if msn.feed == nil { + msn.feed = new(event.Feed) + } + return msn.feed +} + // MockStateNotifier mocks the state notifier. type MockStateNotifier struct { feed *event.Feed diff --git a/beacon-chain/core/feed/block/BUILD.bazel b/beacon-chain/core/feed/block/BUILD.bazel new file mode 100644 index 0000000000..c13063239e --- /dev/null +++ b/beacon-chain/core/feed/block/BUILD.bazel @@ -0,0 +1,15 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "go_default_library", + srcs = [ + "events.go", + "notifier.go", + ], + importpath = "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/block", + visibility = ["//beacon-chain:__subpackages__"], + deps = [ + "//shared/event:go_default_library", + "@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library", + ], +) diff --git a/beacon-chain/core/feed/block/events.go b/beacon-chain/core/feed/block/events.go new file mode 100644 index 0000000000..edf458b1b6 --- /dev/null +++ b/beacon-chain/core/feed/block/events.go @@ -0,0 +1,15 @@ +package block + +import ( + ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" +) + +const ( + // ReceivedBlock is sent after a block has been received by the beacon node via p2p or RPC. + ReceivedBlock = iota + 1 +) + +// ReceivedBlockData is the data sent with ReceivedBlock events. +type ReceivedBlockData struct { + SignedBlock *ethpb.SignedBeaconBlock +} diff --git a/beacon-chain/core/feed/block/notifier.go b/beacon-chain/core/feed/block/notifier.go new file mode 100644 index 0000000000..9163bb2daf --- /dev/null +++ b/beacon-chain/core/feed/block/notifier.go @@ -0,0 +1,8 @@ +package block + +import "github.com/prysmaticlabs/prysm/shared/event" + +// Notifier interface defines the methods of the service that provides block updates to consumers. +type Notifier interface { + BlockFeed() *event.Feed +} diff --git a/beacon-chain/node/node.go b/beacon-chain/node/node.go index 15dfe54401..6144172362 100644 --- a/beacon-chain/node/node.go +++ b/beacon-chain/node/node.go @@ -64,6 +64,7 @@ type BeaconNode struct { exitPool *voluntaryexits.Pool depositCache *depositcache.DepositCache stateFeed *event.Feed + blockFeed *event.Feed opFeed *event.Feed forkChoiceStore forkchoice.ForkChoicer } @@ -104,6 +105,7 @@ func NewBeaconNode(ctx *cli.Context) (*BeaconNode, error) { services: registry, stop: make(chan struct{}), stateFeed: new(event.Feed), + blockFeed: new(event.Feed), opFeed: new(event.Feed), attestationPool: attestations.NewPool(), exitPool: voluntaryexits.NewPool(), @@ -169,6 +171,11 @@ func (b *BeaconNode) StateFeed() *event.Feed { return b.stateFeed } +// BlockFeed implements blockfeed.Notifier. +func (b *BeaconNode) BlockFeed() *event.Feed { + return b.blockFeed +} + // OperationFeed implements opfeed.Notifier. func (b *BeaconNode) OperationFeed() *event.Feed { return b.opFeed @@ -407,6 +414,7 @@ func (b *BeaconNode) registerSyncService(ctx *cli.Context) error { Chain: chainService, InitialSync: initSync, StateNotifier: b, + BlockNotifier: b, AttPool: b.attestationPool, ExitPool: b.exitPool, }) @@ -415,7 +423,6 @@ func (b *BeaconNode) registerSyncService(ctx *cli.Context) error { } func (b *BeaconNode) registerInitialSyncService(ctx *cli.Context) error { - var chainService *blockchain.Service if err := b.services.FetchService(&chainService); err != nil { return err @@ -426,6 +433,7 @@ func (b *BeaconNode) registerInitialSyncService(ctx *cli.Context) error { Chain: chainService, P2P: b.fetchP2P(ctx), StateNotifier: b, + BlockNotifier: b, }) return b.services.RegisterService(is) @@ -495,6 +503,7 @@ func (b *BeaconNode) registerRPCService(ctx *cli.Context) error { SyncService: syncService, DepositFetcher: depositFetcher, PendingDepositFetcher: b.depositCache, + BlockNotifier: b, StateNotifier: b, OperationNotifier: b, SlasherCert: slasherCert, diff --git a/beacon-chain/rpc/BUILD.bazel b/beacon-chain/rpc/BUILD.bazel index 4435228d65..0b9a43368d 100644 --- a/beacon-chain/rpc/BUILD.bazel +++ b/beacon-chain/rpc/BUILD.bazel @@ -9,6 +9,7 @@ go_library( "//beacon-chain/blockchain:go_default_library", "//beacon-chain/cache:go_default_library", "//beacon-chain/cache/depositcache:go_default_library", + "//beacon-chain/core/feed/block:go_default_library", "//beacon-chain/core/feed/operation:go_default_library", "//beacon-chain/core/feed/state:go_default_library", "//beacon-chain/db:go_default_library", diff --git a/beacon-chain/rpc/beacon/BUILD.bazel b/beacon-chain/rpc/beacon/BUILD.bazel index 4c79ed5c33..c640ae9a30 100644 --- a/beacon-chain/rpc/beacon/BUILD.bazel +++ b/beacon-chain/rpc/beacon/BUILD.bazel @@ -17,6 +17,7 @@ go_library( "//beacon-chain/blockchain:go_default_library", "//beacon-chain/core/epoch/precompute:go_default_library", "//beacon-chain/core/feed:go_default_library", + "//beacon-chain/core/feed/block:go_default_library", "//beacon-chain/core/feed/state:go_default_library", "//beacon-chain/core/helpers:go_default_library", "//beacon-chain/core/state:go_default_library", @@ -57,6 +58,7 @@ go_test( "//beacon-chain/blockchain/testing:go_default_library", "//beacon-chain/core/epoch/precompute:go_default_library", "//beacon-chain/core/feed:go_default_library", + "//beacon-chain/core/feed/block:go_default_library", "//beacon-chain/core/feed/state:go_default_library", "//beacon-chain/core/helpers:go_default_library", "//beacon-chain/db:go_default_library", diff --git a/beacon-chain/rpc/beacon/blocks.go b/beacon-chain/rpc/beacon/blocks.go index 4e8a996bf9..03fdb54da6 100644 --- a/beacon-chain/rpc/beacon/blocks.go +++ b/beacon-chain/rpc/beacon/blocks.go @@ -8,6 +8,7 @@ import ( ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" "github.com/prysmaticlabs/go-ssz" "github.com/prysmaticlabs/prysm/beacon-chain/core/feed" + blockfeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/block" statefeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/state" "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" "github.com/prysmaticlabs/prysm/beacon-chain/db/filters" @@ -174,7 +175,37 @@ func (bs *Server) GetChainHead(ctx context.Context, _ *ptypes.Empty) (*ethpb.Cha // StreamBlocks to clients every single time a block is received by the beacon node. func (bs *Server) StreamBlocks(_ *ptypes.Empty, stream ethpb.BeaconChain_StreamBlocksServer) error { - return status.Error(codes.Unimplemented, "Unimplemented") + blocksChannel := make(chan *feed.Event, 1) + blockSub := bs.BlockNotifier.BlockFeed().Subscribe(blocksChannel) + defer blockSub.Unsubscribe() + for { + select { + case event := <-blocksChannel: + if event.Type == blockfeed.ReceivedBlock { + data, ok := event.Data.(*blockfeed.ReceivedBlockData) + if !ok { + return status.Errorf( + codes.FailedPrecondition, + "Could not subscribe to block feed, received bad data: %v", + data, + ) + } + if data.SignedBlock == nil { + // One nil block shouldn't stop the stream. + continue + } + if err := stream.Send(data.SignedBlock.Block); err != nil { + return status.Errorf(codes.Unavailable, "Could not send over stream: %v", err) + } + } + case <-blockSub.Err(): + return status.Error(codes.Aborted, "Subscriber closed, exiting goroutine") + case <-bs.Ctx.Done(): + return status.Error(codes.Canceled, "Context canceled") + case <-stream.Context().Done(): + return status.Error(codes.Canceled, "Context canceled") + } + } } // StreamChainHead to clients every single time the head block and state of the chain change. diff --git a/beacon-chain/rpc/beacon/blocks_test.go b/beacon-chain/rpc/beacon/blocks_test.go index a3844b77ee..79619fe41c 100644 --- a/beacon-chain/rpc/beacon/blocks_test.go +++ b/beacon-chain/rpc/beacon/blocks_test.go @@ -15,6 +15,7 @@ import ( "github.com/prysmaticlabs/go-ssz" mock "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing" "github.com/prysmaticlabs/prysm/beacon-chain/core/feed" + blockfeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/block" statefeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/state" "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" dbTest "github.com/prysmaticlabs/prysm/beacon-chain/db/testing" @@ -589,3 +590,72 @@ func TestServer_StreamChainHead_OnHeadUpdated(t *testing.T) { } <-exitRoutine } + +func TestServer_StreamBlocks_ContextCanceled(t *testing.T) { + db := dbTest.SetupDB(t) + defer dbTest.TeardownDB(t, db) + ctx := context.Background() + + chainService := &mock.ChainService{} + ctx, cancel := context.WithCancel(ctx) + server := &Server{ + Ctx: ctx, + BlockNotifier: chainService.BlockNotifier(), + BeaconDB: db, + } + + exitRoutine := make(chan bool) + ctrl := gomock.NewController(t) + defer ctrl.Finish() + mockStream := mockRPC.NewMockBeaconChain_StreamBlocksServer(ctrl) + mockStream.EXPECT().Context().Return(ctx) + go func(tt *testing.T) { + if err := server.StreamBlocks(&ptypes.Empty{}, mockStream); !strings.Contains(err.Error(), "Context canceled") { + tt.Errorf("Could not call RPC method: %v", err) + } + <-exitRoutine + }(t) + cancel() + exitRoutine <- true +} + +func TestServer_StreamBlocks_OnHeadUpdated(t *testing.T) { + db := dbTest.SetupDB(t) + defer dbTest.TeardownDB(t, db) + + b := ðpb.SignedBeaconBlock{ + Block: ðpb.BeaconBlock{ + Slot: 1, + }, + } + + chainService := &mock.ChainService{} + ctx := context.Background() + server := &Server{ + Ctx: ctx, + BlockNotifier: chainService.BlockNotifier(), + } + exitRoutine := make(chan bool) + ctrl := gomock.NewController(t) + defer ctrl.Finish() + mockStream := mockRPC.NewMockBeaconChain_StreamBlocksServer(ctrl) + mockStream.EXPECT().Send(b.Block).Do(func(arg0 interface{}) { + exitRoutine <- true + }) + mockStream.EXPECT().Context().Return(ctx).AnyTimes() + + go func(tt *testing.T) { + if err := server.StreamBlocks(&ptypes.Empty{}, mockStream); err != nil { + tt.Errorf("Could not call RPC method: %v", err) + } + }(t) + + // Send in a loop to ensure it is delivered (busy wait for the service to subscribe to the state feed). + for sent := 0; sent == 0; { + sent = server.BlockNotifier.BlockFeed().Send(&feed.Event{ + Type: blockfeed.ReceivedBlock, + Data: &blockfeed.ReceivedBlockData{SignedBlock: b}, + }) + } + <-exitRoutine +} diff --git a/beacon-chain/rpc/beacon/server.go b/beacon-chain/rpc/beacon/server.go index 91e368ea5d..6b0152dd12 100644 --- a/beacon-chain/rpc/beacon/server.go +++ b/beacon-chain/rpc/beacon/server.go @@ -6,6 +6,7 @@ import ( ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" "github.com/prysmaticlabs/prysm/beacon-chain/blockchain" + blockfeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/block" statefeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/state" "github.com/prysmaticlabs/prysm/beacon-chain/db" "github.com/prysmaticlabs/prysm/beacon-chain/operations/attestations" @@ -25,6 +26,7 @@ type Server struct { FinalizationFetcher blockchain.FinalizationFetcher ParticipationFetcher blockchain.ParticipationFetcher StateNotifier statefeed.Notifier + BlockNotifier blockfeed.Notifier Pool attestations.Pool IncomingAttestation chan *ethpb.Attestation CanonicalStateChan chan *pbp2p.BeaconState diff --git a/beacon-chain/rpc/service.go b/beacon-chain/rpc/service.go index 0150ebf13b..9583d3bcef 100644 --- a/beacon-chain/rpc/service.go +++ b/beacon-chain/rpc/service.go @@ -16,6 +16,7 @@ import ( "github.com/prysmaticlabs/prysm/beacon-chain/blockchain" "github.com/prysmaticlabs/prysm/beacon-chain/cache" "github.com/prysmaticlabs/prysm/beacon-chain/cache/depositcache" + blockfeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/block" opfeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/operation" statefeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/state" "github.com/prysmaticlabs/prysm/beacon-chain/db" @@ -81,6 +82,7 @@ type Service struct { depositFetcher depositcache.DepositFetcher pendingDepositFetcher depositcache.PendingDepositsFetcher stateNotifier statefeed.Notifier + blockNotifier blockfeed.Notifier operationNotifier opfeed.Notifier slasherConn *grpc.ClientConn slasherProvider string @@ -116,6 +118,7 @@ type Config struct { SlasherProvider string SlasherCert string StateNotifier statefeed.Notifier + BlockNotifier blockfeed.Notifier OperationNotifier opfeed.Notifier } @@ -151,6 +154,7 @@ func NewService(ctx context.Context, cfg *Config) *Service { canonicalStateChan: make(chan *pbp2p.BeaconState, params.BeaconConfig().DefaultBufferSize), incomingAttestation: make(chan *ethpb.Attestation, params.BeaconConfig().DefaultBufferSize), stateNotifier: cfg.StateNotifier, + blockNotifier: cfg.BlockNotifier, operationNotifier: cfg.OperationNotifier, slasherProvider: cfg.SlasherProvider, slasherCert: cfg.SlasherCert, @@ -217,6 +221,7 @@ func (s *Service) Start() { Eth1InfoFetcher: s.powChainService, SyncChecker: s.syncService, StateNotifier: s.stateNotifier, + BlockNotifier: s.blockNotifier, OperationNotifier: s.operationNotifier, P2P: s.p2p, BlockReceiver: s.blockReceiver, @@ -242,6 +247,7 @@ func (s *Service) Start() { ChainStartFetcher: s.chainStartFetcher, CanonicalStateChan: s.canonicalStateChan, StateNotifier: s.stateNotifier, + BlockNotifier: s.blockNotifier, SlotTicker: ticker, } aggregatorServer := &aggregator.Server{ diff --git a/beacon-chain/rpc/validator/BUILD.bazel b/beacon-chain/rpc/validator/BUILD.bazel index 48e9011977..9e210c93cb 100644 --- a/beacon-chain/rpc/validator/BUILD.bazel +++ b/beacon-chain/rpc/validator/BUILD.bazel @@ -18,6 +18,7 @@ go_library( "//beacon-chain/cache/depositcache:go_default_library", "//beacon-chain/core/blocks:go_default_library", "//beacon-chain/core/feed:go_default_library", + "//beacon-chain/core/feed/block:go_default_library", "//beacon-chain/core/feed/operation:go_default_library", "//beacon-chain/core/feed/state:go_default_library", "//beacon-chain/core/helpers:go_default_library", diff --git a/beacon-chain/rpc/validator/proposer.go b/beacon-chain/rpc/validator/proposer.go index 4857b19f7f..08d28e3d44 100644 --- a/beacon-chain/rpc/validator/proposer.go +++ b/beacon-chain/rpc/validator/proposer.go @@ -10,6 +10,8 @@ import ( ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" "github.com/prysmaticlabs/go-ssz" "github.com/prysmaticlabs/prysm/beacon-chain/core/blocks" + "github.com/prysmaticlabs/prysm/beacon-chain/core/feed" + blockfeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/block" "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" "github.com/prysmaticlabs/prysm/beacon-chain/core/state" "github.com/prysmaticlabs/prysm/beacon-chain/core/state/interop" @@ -105,6 +107,10 @@ func (vs *Server) ProposeBlock(ctx context.Context, blk *ethpb.SignedBeaconBlock } log.WithField("blockRoot", fmt.Sprintf("%#x", bytesutil.Trunc(root[:]))).Debugf( "Block proposal received via RPC") + vs.BlockNotifier.BlockFeed().Send(&feed.Event{ + Type: blockfeed.ReceivedBlock, + Data: blockfeed.ReceivedBlockData{SignedBlock: blk}, + }) if err := vs.BlockReceiver.ReceiveBlock(ctx, blk); err != nil { return nil, status.Errorf(codes.Internal, "Could not process beacon block: %v", err) } diff --git a/beacon-chain/rpc/validator/proposer_test.go b/beacon-chain/rpc/validator/proposer_test.go index d3d484955e..9286d2d4f6 100644 --- a/beacon-chain/rpc/validator/proposer_test.go +++ b/beacon-chain/rpc/validator/proposer_test.go @@ -130,13 +130,15 @@ func TestProposeBlock_OK(t *testing.T) { t.Fatalf("Could not save genesis state: %v", err) } + c := &mock.ChainService{} proposerServer := &Server{ BeaconDB: db, ChainStartFetcher: &mockPOW.POWChain{}, Eth1InfoFetcher: &mockPOW.POWChain{}, Eth1BlockFetcher: &mockPOW.POWChain{}, - BlockReceiver: &mock.ChainService{}, - HeadFetcher: &mock.ChainService{}, + BlockReceiver: c, + HeadFetcher: c, + BlockNotifier: c.BlockNotifier(), } req := ðpb.SignedBeaconBlock{ Block: ðpb.BeaconBlock{ diff --git a/beacon-chain/rpc/validator/server.go b/beacon-chain/rpc/validator/server.go index f6bafcfcde..ea6844bf7f 100644 --- a/beacon-chain/rpc/validator/server.go +++ b/beacon-chain/rpc/validator/server.go @@ -10,6 +10,7 @@ import ( "github.com/prysmaticlabs/prysm/beacon-chain/cache" "github.com/prysmaticlabs/prysm/beacon-chain/cache/depositcache" "github.com/prysmaticlabs/prysm/beacon-chain/core/feed" + blockfeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/block" opfeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/operation" statefeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/state" "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" @@ -50,6 +51,7 @@ type Server struct { Eth1InfoFetcher powchain.ChainInfoFetcher SyncChecker sync.Checker StateNotifier statefeed.Notifier + BlockNotifier blockfeed.Notifier P2P p2p.Broadcaster AttPool attestations.Pool ExitPool *voluntaryexits.Pool diff --git a/beacon-chain/sync/BUILD.bazel b/beacon-chain/sync/BUILD.bazel index 5345987d19..84d11975b7 100644 --- a/beacon-chain/sync/BUILD.bazel +++ b/beacon-chain/sync/BUILD.bazel @@ -36,6 +36,7 @@ go_library( "//beacon-chain/blockchain:go_default_library", "//beacon-chain/core/blocks:go_default_library", "//beacon-chain/core/feed:go_default_library", + "//beacon-chain/core/feed/block:go_default_library", "//beacon-chain/core/feed/state:go_default_library", "//beacon-chain/core/helpers:go_default_library", "//beacon-chain/core/state:go_default_library", diff --git a/beacon-chain/sync/initial-sync/BUILD.bazel b/beacon-chain/sync/initial-sync/BUILD.bazel index a753a6eb83..fd4ad1df90 100644 --- a/beacon-chain/sync/initial-sync/BUILD.bazel +++ b/beacon-chain/sync/initial-sync/BUILD.bazel @@ -12,6 +12,7 @@ go_library( deps = [ "//beacon-chain/blockchain:go_default_library", "//beacon-chain/core/feed:go_default_library", + "//beacon-chain/core/feed/block:go_default_library", "//beacon-chain/core/feed/state:go_default_library", "//beacon-chain/core/helpers:go_default_library", "//beacon-chain/db:go_default_library", diff --git a/beacon-chain/sync/initial-sync/round_robin.go b/beacon-chain/sync/initial-sync/round_robin.go index 5b44bb30bd..0632ad069b 100644 --- a/beacon-chain/sync/initial-sync/round_robin.go +++ b/beacon-chain/sync/initial-sync/round_robin.go @@ -13,6 +13,8 @@ import ( "github.com/paulbellamy/ratecounter" "github.com/pkg/errors" eth "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" + "github.com/prysmaticlabs/prysm/beacon-chain/core/feed" + blockfeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/block" "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" "github.com/prysmaticlabs/prysm/beacon-chain/flags" prysmsync "github.com/prysmaticlabs/prysm/beacon-chain/sync" @@ -210,6 +212,10 @@ func (s *Service) roundRobinSync(genesis time.Time) error { log.Debugf("Beacon node doesn't have a block in db with root %#x", blk.Block.ParentRoot) continue } + s.blockNotifier.BlockFeed().Send(&feed.Event{ + Type: blockfeed.ReceivedBlock, + Data: blockfeed.ReceivedBlockData{SignedBlock: blk}, + }) if featureconfig.Get().InitSyncNoVerify { if err := s.chain.ReceiveBlockNoVerify(ctx, blk); err != nil { return err diff --git a/beacon-chain/sync/initial-sync/round_robin_test.go b/beacon-chain/sync/initial-sync/round_robin_test.go index 821ea147ba..accd0b23db 100644 --- a/beacon-chain/sync/initial-sync/round_robin_test.go +++ b/beacon-chain/sync/initial-sync/round_robin_test.go @@ -262,6 +262,7 @@ func TestRoundRobinSync(t *testing.T) { } // no-op mock s := &Service{ chain: mc, + blockNotifier: mc.BlockNotifier(), p2p: p, db: beaconDB, synced: false, diff --git a/beacon-chain/sync/initial-sync/service.go b/beacon-chain/sync/initial-sync/service.go index d25fa333b7..122822ff89 100644 --- a/beacon-chain/sync/initial-sync/service.go +++ b/beacon-chain/sync/initial-sync/service.go @@ -8,6 +8,7 @@ import ( "github.com/pkg/errors" "github.com/prysmaticlabs/prysm/beacon-chain/blockchain" "github.com/prysmaticlabs/prysm/beacon-chain/core/feed" + blockfeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/block" statefeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/state" "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" "github.com/prysmaticlabs/prysm/beacon-chain/db" @@ -38,6 +39,7 @@ type Config struct { DB db.ReadOnlyDatabase Chain blockchainService StateNotifier statefeed.Notifier + BlockNotifier blockfeed.Notifier } // Service service. @@ -49,6 +51,7 @@ type Service struct { synced bool chainStarted bool stateNotifier statefeed.Notifier + blockNotifier blockfeed.Notifier blocksRateLimiter *leakybucket.Collector } @@ -61,6 +64,7 @@ func NewInitialSync(cfg *Config) *Service { p2p: cfg.P2P, db: cfg.DB, stateNotifier: cfg.StateNotifier, + blockNotifier: cfg.BlockNotifier, blocksRateLimiter: leakybucket.NewCollector(allowedBlocksPerSecond, allowedBlocksPerSecond, false /* deleteEmptyBuckets */), } } diff --git a/beacon-chain/sync/service.go b/beacon-chain/sync/service.go index ecee04e0b9..cfcfba1ae2 100644 --- a/beacon-chain/sync/service.go +++ b/beacon-chain/sync/service.go @@ -8,6 +8,7 @@ import ( "github.com/pkg/errors" ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" "github.com/prysmaticlabs/prysm/beacon-chain/blockchain" + blockfeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/block" statefeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/state" "github.com/prysmaticlabs/prysm/beacon-chain/db" "github.com/prysmaticlabs/prysm/beacon-chain/operations/attestations" @@ -30,6 +31,7 @@ type Config struct { Chain blockchainService InitialSync Checker StateNotifier statefeed.Notifier + BlockNotifier blockfeed.Notifier } // This defines the interface for interacting with block chain service @@ -58,6 +60,7 @@ func NewRegularSync(cfg *Config) *Service { seenPendingBlocks: make(map[[32]byte]bool), blkRootToPendingAtts: make(map[[32]byte][]*ethpb.AggregateAttestationAndProof), stateNotifier: cfg.StateNotifier, + blockNotifier: cfg.BlockNotifier, blocksRateLimiter: leakybucket.NewCollector(allowedBlocksPerSecond, allowedBlocksBurst, false /* deleteEmptyBuckets */), } @@ -86,6 +89,7 @@ type Service struct { initialSync Checker validateBlockLock sync.RWMutex stateNotifier statefeed.Notifier + blockNotifier blockfeed.Notifier blocksRateLimiter *leakybucket.Collector } diff --git a/beacon-chain/sync/subscriber_beacon_blocks.go b/beacon-chain/sync/subscriber_beacon_blocks.go index ef7c24d52a..ce80139857 100644 --- a/beacon-chain/sync/subscriber_beacon_blocks.go +++ b/beacon-chain/sync/subscriber_beacon_blocks.go @@ -7,6 +7,8 @@ import ( "github.com/gogo/protobuf/proto" ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" "github.com/prysmaticlabs/go-ssz" + "github.com/prysmaticlabs/prysm/beacon-chain/core/feed" + blockfeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/block" "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" "github.com/prysmaticlabs/prysm/beacon-chain/core/state/interop" "github.com/prysmaticlabs/prysm/shared/bytesutil" @@ -52,6 +54,15 @@ func (r *Service) beaconBlockSubscriber(ctx context.Context, msg proto.Message) return nil } + // Broadcast the block on a feed to notify other services in the beacon node + // of a received block (even if it does not process correctly through a state transition). + r.blockNotifier.BlockFeed().Send(&feed.Event{ + Type: blockfeed.ReceivedBlock, + Data: &blockfeed.ReceivedBlockData{ + SignedBlock: signed, + }, + }) + err = r.chain.ReceiveBlockNoPubsub(ctx, signed) if err != nil { interop.WriteBlockToDisk(signed, true /*failed*/) diff --git a/beacon-chain/sync/subscriber_beacon_blocks_test.go b/beacon-chain/sync/subscriber_beacon_blocks_test.go index ad21ba528f..dbfe3ad8de 100644 --- a/beacon-chain/sync/subscriber_beacon_blocks_test.go +++ b/beacon-chain/sync/subscriber_beacon_blocks_test.go @@ -40,10 +40,12 @@ func TestRegularSyncBeaconBlockSubscriber_FilterByFinalizedEpoch(t *testing.T) { t.Fatal(err) } parentRoot, _ := ssz.HashTreeRoot(parent.Block) + chain := &mock.ChainService{State: s} r := &Service{ - db: db, - chain: &mock.ChainService{State: s}, - attPool: attestations.NewPool(), + db: db, + chain: chain, + blockNotifier: chain.BlockNotifier(), + attPool: attestations.NewPool(), } b := ðpb.SignedBeaconBlock{