mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-09 15:37:56 -05:00
Stream Blocks Functionality for RPC (#4771)
* stream blocks functionality included * necessary tests for stream blocks and notifier * gazelle and tests passing * gazelle and tests passing * Merge branch 'master' into stream-block * Update beacon-chain/core/feed/block/events.go * Merge refs/heads/master into stream-block * Merge refs/heads/master into stream-block * Merge refs/heads/master into stream-block * Merge refs/heads/master into stream-block * naming * build * Merge refs/heads/master into stream-block * Merge refs/heads/master into stream-block * Merge refs/heads/master into stream-block * fix up tests * Merge branch 'stream-block' of github.com:prysmaticlabs/prysm into stream-block * Merge refs/heads/master into stream-block * shay comment * Merge refs/heads/master into stream-block * Merge branch 'stream-block' of github.com:prysmaticlabs/prysm into stream-block * Merge refs/heads/master into stream-block
This commit is contained in:
@@ -8,6 +8,7 @@ go_library(
|
||||
visibility = ["//beacon-chain:__subpackages__"],
|
||||
deps = [
|
||||
"//beacon-chain/core/epoch/precompute:go_default_library",
|
||||
"//beacon-chain/core/feed/block:go_default_library",
|
||||
"//beacon-chain/core/feed/operation:go_default_library",
|
||||
"//beacon-chain/core/feed/state:go_default_library",
|
||||
"//beacon-chain/core/helpers:go_default_library",
|
||||
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
|
||||
"github.com/prysmaticlabs/go-ssz"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/core/epoch/precompute"
|
||||
blockfeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/block"
|
||||
opfeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/operation"
|
||||
statefeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/state"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
|
||||
@@ -34,6 +35,7 @@ type ChainService struct {
|
||||
Fork *pb.Fork
|
||||
DB db.Database
|
||||
stateNotifier statefeed.Notifier
|
||||
blockNotifier blockfeed.Notifier
|
||||
opNotifier opfeed.Notifier
|
||||
}
|
||||
|
||||
@@ -45,6 +47,27 @@ func (ms *ChainService) StateNotifier() statefeed.Notifier {
|
||||
return ms.stateNotifier
|
||||
}
|
||||
|
||||
// BlockNotifier mocks the same method in the chain service.
|
||||
func (ms *ChainService) BlockNotifier() blockfeed.Notifier {
|
||||
if ms.blockNotifier == nil {
|
||||
ms.blockNotifier = &MockBlockNotifier{}
|
||||
}
|
||||
return ms.blockNotifier
|
||||
}
|
||||
|
||||
// MockBlockNotifier mocks the block notifier.
|
||||
type MockBlockNotifier struct {
|
||||
feed *event.Feed
|
||||
}
|
||||
|
||||
// BlockFeed returns a block feed.
|
||||
func (msn *MockBlockNotifier) BlockFeed() *event.Feed {
|
||||
if msn.feed == nil {
|
||||
msn.feed = new(event.Feed)
|
||||
}
|
||||
return msn.feed
|
||||
}
|
||||
|
||||
// MockStateNotifier mocks the state notifier.
|
||||
type MockStateNotifier struct {
|
||||
feed *event.Feed
|
||||
|
||||
15
beacon-chain/core/feed/block/BUILD.bazel
Normal file
15
beacon-chain/core/feed/block/BUILD.bazel
Normal file
@@ -0,0 +1,15 @@
|
||||
load("@io_bazel_rules_go//go:def.bzl", "go_library")
|
||||
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = [
|
||||
"events.go",
|
||||
"notifier.go",
|
||||
],
|
||||
importpath = "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/block",
|
||||
visibility = ["//beacon-chain:__subpackages__"],
|
||||
deps = [
|
||||
"//shared/event:go_default_library",
|
||||
"@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library",
|
||||
],
|
||||
)
|
||||
15
beacon-chain/core/feed/block/events.go
Normal file
15
beacon-chain/core/feed/block/events.go
Normal file
@@ -0,0 +1,15 @@
|
||||
package block
|
||||
|
||||
import (
|
||||
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
|
||||
)
|
||||
|
||||
const (
|
||||
// ReceivedBlock is sent after a block has been received by the beacon node via p2p or RPC.
|
||||
ReceivedBlock = iota + 1
|
||||
)
|
||||
|
||||
// ReceivedBlockData is the data sent with ReceivedBlock events.
|
||||
type ReceivedBlockData struct {
|
||||
SignedBlock *ethpb.SignedBeaconBlock
|
||||
}
|
||||
8
beacon-chain/core/feed/block/notifier.go
Normal file
8
beacon-chain/core/feed/block/notifier.go
Normal file
@@ -0,0 +1,8 @@
|
||||
package block
|
||||
|
||||
import "github.com/prysmaticlabs/prysm/shared/event"
|
||||
|
||||
// Notifier interface defines the methods of the service that provides block updates to consumers.
|
||||
type Notifier interface {
|
||||
BlockFeed() *event.Feed
|
||||
}
|
||||
@@ -64,6 +64,7 @@ type BeaconNode struct {
|
||||
exitPool *voluntaryexits.Pool
|
||||
depositCache *depositcache.DepositCache
|
||||
stateFeed *event.Feed
|
||||
blockFeed *event.Feed
|
||||
opFeed *event.Feed
|
||||
forkChoiceStore forkchoice.ForkChoicer
|
||||
}
|
||||
@@ -104,6 +105,7 @@ func NewBeaconNode(ctx *cli.Context) (*BeaconNode, error) {
|
||||
services: registry,
|
||||
stop: make(chan struct{}),
|
||||
stateFeed: new(event.Feed),
|
||||
blockFeed: new(event.Feed),
|
||||
opFeed: new(event.Feed),
|
||||
attestationPool: attestations.NewPool(),
|
||||
exitPool: voluntaryexits.NewPool(),
|
||||
@@ -169,6 +171,11 @@ func (b *BeaconNode) StateFeed() *event.Feed {
|
||||
return b.stateFeed
|
||||
}
|
||||
|
||||
// BlockFeed implements blockfeed.Notifier.
|
||||
func (b *BeaconNode) BlockFeed() *event.Feed {
|
||||
return b.blockFeed
|
||||
}
|
||||
|
||||
// OperationFeed implements opfeed.Notifier.
|
||||
func (b *BeaconNode) OperationFeed() *event.Feed {
|
||||
return b.opFeed
|
||||
@@ -407,6 +414,7 @@ func (b *BeaconNode) registerSyncService(ctx *cli.Context) error {
|
||||
Chain: chainService,
|
||||
InitialSync: initSync,
|
||||
StateNotifier: b,
|
||||
BlockNotifier: b,
|
||||
AttPool: b.attestationPool,
|
||||
ExitPool: b.exitPool,
|
||||
})
|
||||
@@ -415,7 +423,6 @@ func (b *BeaconNode) registerSyncService(ctx *cli.Context) error {
|
||||
}
|
||||
|
||||
func (b *BeaconNode) registerInitialSyncService(ctx *cli.Context) error {
|
||||
|
||||
var chainService *blockchain.Service
|
||||
if err := b.services.FetchService(&chainService); err != nil {
|
||||
return err
|
||||
@@ -426,6 +433,7 @@ func (b *BeaconNode) registerInitialSyncService(ctx *cli.Context) error {
|
||||
Chain: chainService,
|
||||
P2P: b.fetchP2P(ctx),
|
||||
StateNotifier: b,
|
||||
BlockNotifier: b,
|
||||
})
|
||||
|
||||
return b.services.RegisterService(is)
|
||||
@@ -495,6 +503,7 @@ func (b *BeaconNode) registerRPCService(ctx *cli.Context) error {
|
||||
SyncService: syncService,
|
||||
DepositFetcher: depositFetcher,
|
||||
PendingDepositFetcher: b.depositCache,
|
||||
BlockNotifier: b,
|
||||
StateNotifier: b,
|
||||
OperationNotifier: b,
|
||||
SlasherCert: slasherCert,
|
||||
|
||||
@@ -9,6 +9,7 @@ go_library(
|
||||
"//beacon-chain/blockchain:go_default_library",
|
||||
"//beacon-chain/cache:go_default_library",
|
||||
"//beacon-chain/cache/depositcache:go_default_library",
|
||||
"//beacon-chain/core/feed/block:go_default_library",
|
||||
"//beacon-chain/core/feed/operation:go_default_library",
|
||||
"//beacon-chain/core/feed/state:go_default_library",
|
||||
"//beacon-chain/db:go_default_library",
|
||||
|
||||
@@ -17,6 +17,7 @@ go_library(
|
||||
"//beacon-chain/blockchain:go_default_library",
|
||||
"//beacon-chain/core/epoch/precompute:go_default_library",
|
||||
"//beacon-chain/core/feed:go_default_library",
|
||||
"//beacon-chain/core/feed/block:go_default_library",
|
||||
"//beacon-chain/core/feed/state:go_default_library",
|
||||
"//beacon-chain/core/helpers:go_default_library",
|
||||
"//beacon-chain/core/state:go_default_library",
|
||||
@@ -57,6 +58,7 @@ go_test(
|
||||
"//beacon-chain/blockchain/testing:go_default_library",
|
||||
"//beacon-chain/core/epoch/precompute:go_default_library",
|
||||
"//beacon-chain/core/feed:go_default_library",
|
||||
"//beacon-chain/core/feed/block:go_default_library",
|
||||
"//beacon-chain/core/feed/state:go_default_library",
|
||||
"//beacon-chain/core/helpers:go_default_library",
|
||||
"//beacon-chain/db:go_default_library",
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
|
||||
"github.com/prysmaticlabs/go-ssz"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed"
|
||||
blockfeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/block"
|
||||
statefeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/state"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/db/filters"
|
||||
@@ -174,7 +175,37 @@ func (bs *Server) GetChainHead(ctx context.Context, _ *ptypes.Empty) (*ethpb.Cha
|
||||
|
||||
// StreamBlocks to clients every single time a block is received by the beacon node.
|
||||
func (bs *Server) StreamBlocks(_ *ptypes.Empty, stream ethpb.BeaconChain_StreamBlocksServer) error {
|
||||
return status.Error(codes.Unimplemented, "Unimplemented")
|
||||
blocksChannel := make(chan *feed.Event, 1)
|
||||
blockSub := bs.BlockNotifier.BlockFeed().Subscribe(blocksChannel)
|
||||
defer blockSub.Unsubscribe()
|
||||
for {
|
||||
select {
|
||||
case event := <-blocksChannel:
|
||||
if event.Type == blockfeed.ReceivedBlock {
|
||||
data, ok := event.Data.(*blockfeed.ReceivedBlockData)
|
||||
if !ok {
|
||||
return status.Errorf(
|
||||
codes.FailedPrecondition,
|
||||
"Could not subscribe to block feed, received bad data: %v",
|
||||
data,
|
||||
)
|
||||
}
|
||||
if data.SignedBlock == nil {
|
||||
// One nil block shouldn't stop the stream.
|
||||
continue
|
||||
}
|
||||
if err := stream.Send(data.SignedBlock.Block); err != nil {
|
||||
return status.Errorf(codes.Unavailable, "Could not send over stream: %v", err)
|
||||
}
|
||||
}
|
||||
case <-blockSub.Err():
|
||||
return status.Error(codes.Aborted, "Subscriber closed, exiting goroutine")
|
||||
case <-bs.Ctx.Done():
|
||||
return status.Error(codes.Canceled, "Context canceled")
|
||||
case <-stream.Context().Done():
|
||||
return status.Error(codes.Canceled, "Context canceled")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// StreamChainHead to clients every single time the head block and state of the chain change.
|
||||
|
||||
@@ -15,6 +15,7 @@ import (
|
||||
"github.com/prysmaticlabs/go-ssz"
|
||||
mock "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed"
|
||||
blockfeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/block"
|
||||
statefeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/state"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
|
||||
dbTest "github.com/prysmaticlabs/prysm/beacon-chain/db/testing"
|
||||
@@ -589,3 +590,72 @@ func TestServer_StreamChainHead_OnHeadUpdated(t *testing.T) {
|
||||
}
|
||||
<-exitRoutine
|
||||
}
|
||||
|
||||
func TestServer_StreamBlocks_ContextCanceled(t *testing.T) {
|
||||
db := dbTest.SetupDB(t)
|
||||
defer dbTest.TeardownDB(t, db)
|
||||
ctx := context.Background()
|
||||
|
||||
chainService := &mock.ChainService{}
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
server := &Server{
|
||||
Ctx: ctx,
|
||||
BlockNotifier: chainService.BlockNotifier(),
|
||||
BeaconDB: db,
|
||||
}
|
||||
|
||||
exitRoutine := make(chan bool)
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
mockStream := mockRPC.NewMockBeaconChain_StreamBlocksServer(ctrl)
|
||||
mockStream.EXPECT().Context().Return(ctx)
|
||||
go func(tt *testing.T) {
|
||||
if err := server.StreamBlocks(&ptypes.Empty{}, mockStream); !strings.Contains(err.Error(), "Context canceled") {
|
||||
tt.Errorf("Could not call RPC method: %v", err)
|
||||
}
|
||||
<-exitRoutine
|
||||
}(t)
|
||||
cancel()
|
||||
exitRoutine <- true
|
||||
}
|
||||
|
||||
func TestServer_StreamBlocks_OnHeadUpdated(t *testing.T) {
|
||||
db := dbTest.SetupDB(t)
|
||||
defer dbTest.TeardownDB(t, db)
|
||||
|
||||
b := ðpb.SignedBeaconBlock{
|
||||
Block: ðpb.BeaconBlock{
|
||||
Slot: 1,
|
||||
},
|
||||
}
|
||||
|
||||
chainService := &mock.ChainService{}
|
||||
ctx := context.Background()
|
||||
server := &Server{
|
||||
Ctx: ctx,
|
||||
BlockNotifier: chainService.BlockNotifier(),
|
||||
}
|
||||
exitRoutine := make(chan bool)
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
mockStream := mockRPC.NewMockBeaconChain_StreamBlocksServer(ctrl)
|
||||
mockStream.EXPECT().Send(b.Block).Do(func(arg0 interface{}) {
|
||||
exitRoutine <- true
|
||||
})
|
||||
mockStream.EXPECT().Context().Return(ctx).AnyTimes()
|
||||
|
||||
go func(tt *testing.T) {
|
||||
if err := server.StreamBlocks(&ptypes.Empty{}, mockStream); err != nil {
|
||||
tt.Errorf("Could not call RPC method: %v", err)
|
||||
}
|
||||
}(t)
|
||||
|
||||
// Send in a loop to ensure it is delivered (busy wait for the service to subscribe to the state feed).
|
||||
for sent := 0; sent == 0; {
|
||||
sent = server.BlockNotifier.BlockFeed().Send(&feed.Event{
|
||||
Type: blockfeed.ReceivedBlock,
|
||||
Data: &blockfeed.ReceivedBlockData{SignedBlock: b},
|
||||
})
|
||||
}
|
||||
<-exitRoutine
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
|
||||
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/blockchain"
|
||||
blockfeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/block"
|
||||
statefeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/state"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/db"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/operations/attestations"
|
||||
@@ -25,6 +26,7 @@ type Server struct {
|
||||
FinalizationFetcher blockchain.FinalizationFetcher
|
||||
ParticipationFetcher blockchain.ParticipationFetcher
|
||||
StateNotifier statefeed.Notifier
|
||||
BlockNotifier blockfeed.Notifier
|
||||
Pool attestations.Pool
|
||||
IncomingAttestation chan *ethpb.Attestation
|
||||
CanonicalStateChan chan *pbp2p.BeaconState
|
||||
|
||||
@@ -16,6 +16,7 @@ import (
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/blockchain"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/cache"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/cache/depositcache"
|
||||
blockfeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/block"
|
||||
opfeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/operation"
|
||||
statefeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/state"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/db"
|
||||
@@ -81,6 +82,7 @@ type Service struct {
|
||||
depositFetcher depositcache.DepositFetcher
|
||||
pendingDepositFetcher depositcache.PendingDepositsFetcher
|
||||
stateNotifier statefeed.Notifier
|
||||
blockNotifier blockfeed.Notifier
|
||||
operationNotifier opfeed.Notifier
|
||||
slasherConn *grpc.ClientConn
|
||||
slasherProvider string
|
||||
@@ -116,6 +118,7 @@ type Config struct {
|
||||
SlasherProvider string
|
||||
SlasherCert string
|
||||
StateNotifier statefeed.Notifier
|
||||
BlockNotifier blockfeed.Notifier
|
||||
OperationNotifier opfeed.Notifier
|
||||
}
|
||||
|
||||
@@ -151,6 +154,7 @@ func NewService(ctx context.Context, cfg *Config) *Service {
|
||||
canonicalStateChan: make(chan *pbp2p.BeaconState, params.BeaconConfig().DefaultBufferSize),
|
||||
incomingAttestation: make(chan *ethpb.Attestation, params.BeaconConfig().DefaultBufferSize),
|
||||
stateNotifier: cfg.StateNotifier,
|
||||
blockNotifier: cfg.BlockNotifier,
|
||||
operationNotifier: cfg.OperationNotifier,
|
||||
slasherProvider: cfg.SlasherProvider,
|
||||
slasherCert: cfg.SlasherCert,
|
||||
@@ -217,6 +221,7 @@ func (s *Service) Start() {
|
||||
Eth1InfoFetcher: s.powChainService,
|
||||
SyncChecker: s.syncService,
|
||||
StateNotifier: s.stateNotifier,
|
||||
BlockNotifier: s.blockNotifier,
|
||||
OperationNotifier: s.operationNotifier,
|
||||
P2P: s.p2p,
|
||||
BlockReceiver: s.blockReceiver,
|
||||
@@ -242,6 +247,7 @@ func (s *Service) Start() {
|
||||
ChainStartFetcher: s.chainStartFetcher,
|
||||
CanonicalStateChan: s.canonicalStateChan,
|
||||
StateNotifier: s.stateNotifier,
|
||||
BlockNotifier: s.blockNotifier,
|
||||
SlotTicker: ticker,
|
||||
}
|
||||
aggregatorServer := &aggregator.Server{
|
||||
|
||||
@@ -18,6 +18,7 @@ go_library(
|
||||
"//beacon-chain/cache/depositcache:go_default_library",
|
||||
"//beacon-chain/core/blocks:go_default_library",
|
||||
"//beacon-chain/core/feed:go_default_library",
|
||||
"//beacon-chain/core/feed/block:go_default_library",
|
||||
"//beacon-chain/core/feed/operation:go_default_library",
|
||||
"//beacon-chain/core/feed/state:go_default_library",
|
||||
"//beacon-chain/core/helpers:go_default_library",
|
||||
|
||||
@@ -10,6 +10,8 @@ import (
|
||||
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
|
||||
"github.com/prysmaticlabs/go-ssz"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/core/blocks"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed"
|
||||
blockfeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/block"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/core/state"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/core/state/interop"
|
||||
@@ -105,6 +107,10 @@ func (vs *Server) ProposeBlock(ctx context.Context, blk *ethpb.SignedBeaconBlock
|
||||
}
|
||||
log.WithField("blockRoot", fmt.Sprintf("%#x", bytesutil.Trunc(root[:]))).Debugf(
|
||||
"Block proposal received via RPC")
|
||||
vs.BlockNotifier.BlockFeed().Send(&feed.Event{
|
||||
Type: blockfeed.ReceivedBlock,
|
||||
Data: blockfeed.ReceivedBlockData{SignedBlock: blk},
|
||||
})
|
||||
if err := vs.BlockReceiver.ReceiveBlock(ctx, blk); err != nil {
|
||||
return nil, status.Errorf(codes.Internal, "Could not process beacon block: %v", err)
|
||||
}
|
||||
|
||||
@@ -130,13 +130,15 @@ func TestProposeBlock_OK(t *testing.T) {
|
||||
t.Fatalf("Could not save genesis state: %v", err)
|
||||
}
|
||||
|
||||
c := &mock.ChainService{}
|
||||
proposerServer := &Server{
|
||||
BeaconDB: db,
|
||||
ChainStartFetcher: &mockPOW.POWChain{},
|
||||
Eth1InfoFetcher: &mockPOW.POWChain{},
|
||||
Eth1BlockFetcher: &mockPOW.POWChain{},
|
||||
BlockReceiver: &mock.ChainService{},
|
||||
HeadFetcher: &mock.ChainService{},
|
||||
BlockReceiver: c,
|
||||
HeadFetcher: c,
|
||||
BlockNotifier: c.BlockNotifier(),
|
||||
}
|
||||
req := ðpb.SignedBeaconBlock{
|
||||
Block: ðpb.BeaconBlock{
|
||||
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/cache"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/cache/depositcache"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed"
|
||||
blockfeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/block"
|
||||
opfeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/operation"
|
||||
statefeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/state"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
|
||||
@@ -50,6 +51,7 @@ type Server struct {
|
||||
Eth1InfoFetcher powchain.ChainInfoFetcher
|
||||
SyncChecker sync.Checker
|
||||
StateNotifier statefeed.Notifier
|
||||
BlockNotifier blockfeed.Notifier
|
||||
P2P p2p.Broadcaster
|
||||
AttPool attestations.Pool
|
||||
ExitPool *voluntaryexits.Pool
|
||||
|
||||
@@ -36,6 +36,7 @@ go_library(
|
||||
"//beacon-chain/blockchain:go_default_library",
|
||||
"//beacon-chain/core/blocks:go_default_library",
|
||||
"//beacon-chain/core/feed:go_default_library",
|
||||
"//beacon-chain/core/feed/block:go_default_library",
|
||||
"//beacon-chain/core/feed/state:go_default_library",
|
||||
"//beacon-chain/core/helpers:go_default_library",
|
||||
"//beacon-chain/core/state:go_default_library",
|
||||
|
||||
@@ -12,6 +12,7 @@ go_library(
|
||||
deps = [
|
||||
"//beacon-chain/blockchain:go_default_library",
|
||||
"//beacon-chain/core/feed:go_default_library",
|
||||
"//beacon-chain/core/feed/block:go_default_library",
|
||||
"//beacon-chain/core/feed/state:go_default_library",
|
||||
"//beacon-chain/core/helpers:go_default_library",
|
||||
"//beacon-chain/db:go_default_library",
|
||||
|
||||
@@ -13,6 +13,8 @@ import (
|
||||
"github.com/paulbellamy/ratecounter"
|
||||
"github.com/pkg/errors"
|
||||
eth "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed"
|
||||
blockfeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/block"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/flags"
|
||||
prysmsync "github.com/prysmaticlabs/prysm/beacon-chain/sync"
|
||||
@@ -210,6 +212,10 @@ func (s *Service) roundRobinSync(genesis time.Time) error {
|
||||
log.Debugf("Beacon node doesn't have a block in db with root %#x", blk.Block.ParentRoot)
|
||||
continue
|
||||
}
|
||||
s.blockNotifier.BlockFeed().Send(&feed.Event{
|
||||
Type: blockfeed.ReceivedBlock,
|
||||
Data: blockfeed.ReceivedBlockData{SignedBlock: blk},
|
||||
})
|
||||
if featureconfig.Get().InitSyncNoVerify {
|
||||
if err := s.chain.ReceiveBlockNoVerify(ctx, blk); err != nil {
|
||||
return err
|
||||
|
||||
@@ -262,6 +262,7 @@ func TestRoundRobinSync(t *testing.T) {
|
||||
} // no-op mock
|
||||
s := &Service{
|
||||
chain: mc,
|
||||
blockNotifier: mc.BlockNotifier(),
|
||||
p2p: p,
|
||||
db: beaconDB,
|
||||
synced: false,
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/blockchain"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed"
|
||||
blockfeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/block"
|
||||
statefeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/state"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/db"
|
||||
@@ -38,6 +39,7 @@ type Config struct {
|
||||
DB db.ReadOnlyDatabase
|
||||
Chain blockchainService
|
||||
StateNotifier statefeed.Notifier
|
||||
BlockNotifier blockfeed.Notifier
|
||||
}
|
||||
|
||||
// Service service.
|
||||
@@ -49,6 +51,7 @@ type Service struct {
|
||||
synced bool
|
||||
chainStarted bool
|
||||
stateNotifier statefeed.Notifier
|
||||
blockNotifier blockfeed.Notifier
|
||||
blocksRateLimiter *leakybucket.Collector
|
||||
}
|
||||
|
||||
@@ -61,6 +64,7 @@ func NewInitialSync(cfg *Config) *Service {
|
||||
p2p: cfg.P2P,
|
||||
db: cfg.DB,
|
||||
stateNotifier: cfg.StateNotifier,
|
||||
blockNotifier: cfg.BlockNotifier,
|
||||
blocksRateLimiter: leakybucket.NewCollector(allowedBlocksPerSecond, allowedBlocksPerSecond, false /* deleteEmptyBuckets */),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"github.com/pkg/errors"
|
||||
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/blockchain"
|
||||
blockfeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/block"
|
||||
statefeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/state"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/db"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/operations/attestations"
|
||||
@@ -30,6 +31,7 @@ type Config struct {
|
||||
Chain blockchainService
|
||||
InitialSync Checker
|
||||
StateNotifier statefeed.Notifier
|
||||
BlockNotifier blockfeed.Notifier
|
||||
}
|
||||
|
||||
// This defines the interface for interacting with block chain service
|
||||
@@ -58,6 +60,7 @@ func NewRegularSync(cfg *Config) *Service {
|
||||
seenPendingBlocks: make(map[[32]byte]bool),
|
||||
blkRootToPendingAtts: make(map[[32]byte][]*ethpb.AggregateAttestationAndProof),
|
||||
stateNotifier: cfg.StateNotifier,
|
||||
blockNotifier: cfg.BlockNotifier,
|
||||
blocksRateLimiter: leakybucket.NewCollector(allowedBlocksPerSecond, allowedBlocksBurst, false /* deleteEmptyBuckets */),
|
||||
}
|
||||
|
||||
@@ -86,6 +89,7 @@ type Service struct {
|
||||
initialSync Checker
|
||||
validateBlockLock sync.RWMutex
|
||||
stateNotifier statefeed.Notifier
|
||||
blockNotifier blockfeed.Notifier
|
||||
blocksRateLimiter *leakybucket.Collector
|
||||
}
|
||||
|
||||
|
||||
@@ -7,6 +7,8 @@ import (
|
||||
"github.com/gogo/protobuf/proto"
|
||||
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
|
||||
"github.com/prysmaticlabs/go-ssz"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed"
|
||||
blockfeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/block"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/core/state/interop"
|
||||
"github.com/prysmaticlabs/prysm/shared/bytesutil"
|
||||
@@ -52,6 +54,15 @@ func (r *Service) beaconBlockSubscriber(ctx context.Context, msg proto.Message)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Broadcast the block on a feed to notify other services in the beacon node
|
||||
// of a received block (even if it does not process correctly through a state transition).
|
||||
r.blockNotifier.BlockFeed().Send(&feed.Event{
|
||||
Type: blockfeed.ReceivedBlock,
|
||||
Data: &blockfeed.ReceivedBlockData{
|
||||
SignedBlock: signed,
|
||||
},
|
||||
})
|
||||
|
||||
err = r.chain.ReceiveBlockNoPubsub(ctx, signed)
|
||||
if err != nil {
|
||||
interop.WriteBlockToDisk(signed, true /*failed*/)
|
||||
|
||||
@@ -40,10 +40,12 @@ func TestRegularSyncBeaconBlockSubscriber_FilterByFinalizedEpoch(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
parentRoot, _ := ssz.HashTreeRoot(parent.Block)
|
||||
chain := &mock.ChainService{State: s}
|
||||
r := &Service{
|
||||
db: db,
|
||||
chain: &mock.ChainService{State: s},
|
||||
attPool: attestations.NewPool(),
|
||||
db: db,
|
||||
chain: chain,
|
||||
blockNotifier: chain.BlockNotifier(),
|
||||
attPool: attestations.NewPool(),
|
||||
}
|
||||
|
||||
b := ðpb.SignedBeaconBlock{
|
||||
|
||||
Reference in New Issue
Block a user