Stream verified block (#8206)

This commit is contained in:
terence tsao
2021-01-05 12:40:11 -08:00
committed by GitHub
parent 9d737d60f4
commit 023e258f6a
13 changed files with 147 additions and 45 deletions

View File

@@ -51,9 +51,10 @@ func (s *Service) ReceiveBlock(ctx context.Context, block *ethpb.SignedBeaconBlo
s.stateNotifier.StateFeed().Send(&feed.Event{
Type: statefeed.BlockProcessed,
Data: &statefeed.BlockProcessedData{
Slot: blockCopy.Block.Slot,
BlockRoot: blockRoot,
Verified: true,
Slot: blockCopy.Block.Slot,
BlockRoot: blockRoot,
SignedBlock: blockCopy,
Verified: true,
},
})
@@ -97,9 +98,10 @@ func (s *Service) ReceiveBlockInitialSync(ctx context.Context, block *ethpb.Sign
s.stateNotifier.StateFeed().Send(&feed.Event{
Type: statefeed.BlockProcessed,
Data: &statefeed.BlockProcessedData{
Slot: blockCopy.Block.Slot,
BlockRoot: blockRoot,
Verified: true,
Slot: blockCopy.Block.Slot,
BlockRoot: blockRoot,
SignedBlock: blockCopy,
Verified: true,
},
})
@@ -141,9 +143,10 @@ func (s *Service) ReceiveBlockBatch(ctx context.Context, blocks []*ethpb.SignedB
s.stateNotifier.StateFeed().Send(&feed.Event{
Type: statefeed.BlockProcessed,
Data: &statefeed.BlockProcessedData{
Slot: blockCopy.Block.Slot,
BlockRoot: blkRoots[i],
Verified: true,
Slot: blockCopy.Block.Slot,
BlockRoot: blkRoots[i],
SignedBlock: blockCopy,
Verified: true,
},
})

View File

@@ -8,5 +8,8 @@ go_library(
],
importpath = "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/state",
visibility = ["//beacon-chain:__subpackages__"],
deps = ["//shared/event:go_default_library"],
deps = [
"//shared/event:go_default_library",
"@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library",
],
)

View File

@@ -3,7 +3,11 @@
// and chain start.
package state
import "time"
import (
"time"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
)
const (
// BlockProcessed is sent after a block has been processed and updated the state database.
@@ -25,6 +29,8 @@ type BlockProcessedData struct {
Slot uint64
// BlockRoot of the processed block.
BlockRoot [32]byte
// SignedBlock is the physical processed block.
SignedBlock *ethpb.SignedBeaconBlock
// Verified is true if the block's BLS contents have been verified.
Verified bool
}

View File

@@ -14,6 +14,7 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/db/filters"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/cmd"
"github.com/prysmaticlabs/prysm/shared/event"
"github.com/prysmaticlabs/prysm/shared/pagination"
"github.com/prysmaticlabs/prysm/shared/params"
"google.golang.org/grpc/codes"
@@ -175,35 +176,53 @@ func (bs *Server) GetChainHead(ctx context.Context, _ *ptypes.Empty) (*ethpb.Cha
}
// StreamBlocks to clients every single time a block is received by the beacon node.
func (bs *Server) StreamBlocks(_ *ptypes.Empty, stream ethpb.BeaconChain_StreamBlocksServer) error {
func (bs *Server) StreamBlocks(req *ethpb.StreamBlocksRequest, stream ethpb.BeaconChain_StreamBlocksServer) error {
blocksChannel := make(chan *feed.Event, 1)
blockSub := bs.BlockNotifier.BlockFeed().Subscribe(blocksChannel)
var blockSub event.Subscription
if req.VerifiedOnly {
blockSub = bs.StateNotifier.StateFeed().Subscribe(blocksChannel)
} else {
blockSub = bs.BlockNotifier.BlockFeed().Subscribe(blocksChannel)
}
defer blockSub.Unsubscribe()
for {
select {
case event := <-blocksChannel:
if event.Type == blockfeed.ReceivedBlock {
data, ok := event.Data.(*blockfeed.ReceivedBlockData)
if !ok {
// Got bad data over the stream.
continue
}
if data.SignedBlock == nil {
// One nil block shouldn't stop the stream.
continue
}
headState, err := bs.HeadFetcher.HeadState(bs.Ctx)
if err != nil {
log.WithError(err).WithField("blockSlot", data.SignedBlock.Block.Slot).Error("Could not get head state")
continue
if req.VerifiedOnly {
if event.Type == statefeed.BlockProcessed {
data, ok := event.Data.(*statefeed.BlockProcessedData)
if !ok || data == nil {
continue
}
if err := stream.Send(data.SignedBlock); err != nil {
return status.Errorf(codes.Unavailable, "Could not send over stream: %v", err)
}
}
} else {
if event.Type == blockfeed.ReceivedBlock {
data, ok := event.Data.(*blockfeed.ReceivedBlockData)
if !ok {
// Got bad data over the stream.
continue
}
if data.SignedBlock == nil {
// One nil block shouldn't stop the stream.
continue
}
headState, err := bs.HeadFetcher.HeadState(bs.Ctx)
if err != nil {
log.WithError(err).WithField("blockSlot", data.SignedBlock.Block.Slot).Error("Could not get head state")
continue
}
if err := blocks.VerifyBlockSignature(headState, data.SignedBlock); err != nil {
log.WithError(err).WithField("blockSlot", data.SignedBlock.Block.Slot).Error("Could not verify block signature")
continue
}
if err := stream.Send(data.SignedBlock); err != nil {
return status.Errorf(codes.Unavailable, "Could not send over stream: %v", err)
if err := blocks.VerifyBlockSignature(headState, data.SignedBlock); err != nil {
log.WithError(err).WithField("blockSlot", data.SignedBlock.Block.Slot).Error("Could not verify block signature")
continue
}
if err := stream.Send(data.SignedBlock); err != nil {
return status.Errorf(codes.Unavailable, "Could not send over stream: %v", err)
}
}
}
case <-blockSub.Err():

View File

@@ -551,6 +551,34 @@ func TestServer_StreamChainHead_OnHeadUpdated(t *testing.T) {
<-exitRoutine
}
func TestServer_StreamBlocksVerified_ContextCanceled(t *testing.T) {
db := dbTest.SetupDB(t)
ctx := context.Background()
chainService := &chainMock.ChainService{}
ctx, cancel := context.WithCancel(ctx)
server := &Server{
Ctx: ctx,
StateNotifier: chainService.StateNotifier(),
HeadFetcher: chainService,
BeaconDB: db,
}
exitRoutine := make(chan bool)
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockStream := mock.NewMockBeaconChain_StreamBlocksServer(ctrl)
mockStream.EXPECT().Context().Return(ctx)
go func(tt *testing.T) {
assert.ErrorContains(tt, "Context canceled", server.StreamBlocks(&ethpb.StreamBlocksRequest{
VerifiedOnly: true,
}, mockStream))
<-exitRoutine
}(t)
cancel()
exitRoutine <- true
}
func TestServer_StreamBlocks_ContextCanceled(t *testing.T) {
db := dbTest.SetupDB(t)
ctx := context.Background()
@@ -570,7 +598,7 @@ func TestServer_StreamBlocks_ContextCanceled(t *testing.T) {
mockStream := mock.NewMockBeaconChain_StreamBlocksServer(ctrl)
mockStream.EXPECT().Context().Return(ctx)
go func(tt *testing.T) {
assert.ErrorContains(tt, "Context canceled", server.StreamBlocks(&ptypes.Empty{}, mockStream))
assert.ErrorContains(tt, "Context canceled", server.StreamBlocks(&ethpb.StreamBlocksRequest{}, mockStream))
<-exitRoutine
}(t)
cancel()
@@ -598,7 +626,7 @@ func TestServer_StreamBlocks_OnHeadUpdated(t *testing.T) {
mockStream.EXPECT().Context().Return(ctx).AnyTimes()
go func(tt *testing.T) {
assert.NoError(tt, server.StreamBlocks(&ptypes.Empty{}, mockStream), "Could not call RPC method")
assert.NoError(tt, server.StreamBlocks(&ethpb.StreamBlocksRequest{}, mockStream), "Could not call RPC method")
}(t)
// Send in a loop to ensure it is delivered (busy wait for the service to subscribe to the state feed).
@@ -611,6 +639,47 @@ func TestServer_StreamBlocks_OnHeadUpdated(t *testing.T) {
<-exitRoutine
}
func TestServer_StreamBlocksVerified_OnHeadUpdated(t *testing.T) {
db := dbTest.SetupDB(t)
ctx := context.Background()
beaconState, privs := testutil.DeterministicGenesisState(t, 32)
b, err := testutil.GenerateFullBlock(beaconState, privs, testutil.DefaultBlockGenConfig(), 1)
require.NoError(t, err)
r, err := b.Block.HashTreeRoot()
require.NoError(t, err)
require.NoError(t, db.SaveBlock(ctx, b))
chainService := &chainMock.ChainService{State: beaconState}
server := &Server{
Ctx: ctx,
StateNotifier: chainService.StateNotifier(),
HeadFetcher: chainService,
BeaconDB: db,
}
exitRoutine := make(chan bool)
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockStream := mock.NewMockBeaconChain_StreamBlocksServer(ctrl)
mockStream.EXPECT().Send(b).Do(func(arg0 interface{}) {
exitRoutine <- true
})
mockStream.EXPECT().Context().Return(ctx).AnyTimes()
go func(tt *testing.T) {
assert.NoError(tt, server.StreamBlocks(&ethpb.StreamBlocksRequest{
VerifiedOnly: true,
}, mockStream), "Could not call RPC method")
}(t)
// Send in a loop to ensure it is delivered (busy wait for the service to subscribe to the state feed).
for sent := 0; sent == 0; {
sent = server.StateNotifier.StateFeed().Send(&feed.Event{
Type: statefeed.BlockProcessed,
Data: &statefeed.BlockProcessedData{Slot: b.Block.Slot, BlockRoot: r, SignedBlock: b},
})
}
<-exitRoutine
}
func TestServer_GetWeakSubjectivityCheckpoint(t *testing.T) {
params.UseMainnetConfig()

View File

@@ -2,6 +2,7 @@ package beacon
import (
"context"
"github.com/prysmaticlabs/prysm/shared/featureconfig"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"

View File

@@ -2,10 +2,11 @@ package beacon
import (
"context"
"testing"
"github.com/gogo/protobuf/proto"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/shared/featureconfig"
"testing"
mock "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing"
"github.com/prysmaticlabs/prysm/beacon-chain/operations/slashings"

View File

@@ -2183,8 +2183,8 @@ def prysm_deps():
name = "com_github_prysmaticlabs_ethereumapis",
build_file_generation = "off",
importpath = "github.com/prysmaticlabs/ethereumapis",
sum = "h1:OUfQgEA6zB19I66EQ2nPtjdBbk+Vv7eCBf2+x3BTv5w=",
version = "v0.0.0-20201117145913-073714f478fb",
sum = "h1:69URSziUFhCVzHIDtPGMwYeP1G3JWhuBdB3enLRne6Y=",
version = "v0.0.0-20210105190001-13193818c0df",
)
go_repository(
name = "com_github_prysmaticlabs_go_bitfield",

2
go.mod
View File

@@ -83,7 +83,7 @@ require (
github.com/prometheus/client_golang v1.7.1
github.com/prometheus/tsdb v0.10.0 // indirect
github.com/protolambda/zssz v0.1.5
github.com/prysmaticlabs/ethereumapis v0.0.0-20201117145913-073714f478fb
github.com/prysmaticlabs/ethereumapis v0.0.0-20210105190001-13193818c0df
github.com/prysmaticlabs/go-bitfield v0.0.0-20200618145306-2ae0807bef65
github.com/prysmaticlabs/go-ssz v0.0.0-20200612203617-6d5c9aa213ae
github.com/prysmaticlabs/prombbolt v0.0.0-20200324184628-09789ef63796

4
go.sum
View File

@@ -985,8 +985,8 @@ github.com/protolambda/zssz v0.1.5 h1:7fjJjissZIIaa2QcvmhS/pZISMX21zVITt49sW1oue
github.com/protolambda/zssz v0.1.5/go.mod h1:a4iwOX5FE7/JkKA+J/PH0Mjo9oXftN6P8NZyL28gpag=
github.com/prysmaticlabs/bazel-go-ethereum v0.0.0-20201126065335-1fb46e307951 h1:Jncuyb/nIJgXbEe0iGz3MN5JmijPVGzwk3G5FR01phI=
github.com/prysmaticlabs/bazel-go-ethereum v0.0.0-20201126065335-1fb46e307951/go.mod h1:JIfVb6esrqALTExdz9hRYvrP0xBDf6wCncIu1hNwHpM=
github.com/prysmaticlabs/ethereumapis v0.0.0-20201117145913-073714f478fb h1:OUfQgEA6zB19I66EQ2nPtjdBbk+Vv7eCBf2+x3BTv5w=
github.com/prysmaticlabs/ethereumapis v0.0.0-20201117145913-073714f478fb/go.mod h1:k7b2dxy6RppCG6kmOJkNOXzRpEoTdsPygc2aQhsUsZk=
github.com/prysmaticlabs/ethereumapis v0.0.0-20210105190001-13193818c0df h1:69URSziUFhCVzHIDtPGMwYeP1G3JWhuBdB3enLRne6Y=
github.com/prysmaticlabs/ethereumapis v0.0.0-20210105190001-13193818c0df/go.mod h1:k7b2dxy6RppCG6kmOJkNOXzRpEoTdsPygc2aQhsUsZk=
github.com/prysmaticlabs/go-bitfield v0.0.0-20200322041314-62c2aee71669 h1:cX6YRZnZ9sgMqM5U14llxUiXVNJ3u07Res1IIjTOgtI=
github.com/prysmaticlabs/go-bitfield v0.0.0-20200322041314-62c2aee71669/go.mod h1:hCwmef+4qXWjv0jLDbQdWnL0Ol7cS7/lCSS26WR+u6s=
github.com/prysmaticlabs/go-bitfield v0.0.0-20200618145306-2ae0807bef65 h1:hJfAWrlxx7SKpn4S/h2JGl2HHwA1a2wSS3HAzzZ0F+U=

View File

@@ -399,7 +399,7 @@ func (mr *MockBeaconChainClientMockRecorder) StreamAttestations(arg0, arg1 inter
}
// StreamBlocks mocks base method
func (m *MockBeaconChainClient) StreamBlocks(arg0 context.Context, arg1 *types.Empty, arg2 ...grpc.CallOption) (eth.BeaconChain_StreamBlocksClient, error) {
func (m *MockBeaconChainClient) StreamBlocks(arg0 context.Context, arg1 *eth.StreamBlocksRequest, arg2 ...grpc.CallOption) (eth.BeaconChain_StreamBlocksClient, error) {
m.ctrl.T.Helper()
varargs := []interface{}{arg0, arg1}
for _, a := range arg2 {

View File

@@ -27,7 +27,7 @@ var reconnectPeriod = 5 * time.Second
func (bs *Service) ReceiveBlocks(ctx context.Context) {
ctx, span := trace.StartSpan(ctx, "beaconclient.ReceiveBlocks")
defer span.End()
stream, err := bs.beaconClient.StreamBlocks(ctx, &ptypes.Empty{})
stream, err := bs.beaconClient.StreamBlocks(ctx, &ethpb.StreamBlocksRequest{} /* Prefers unverified block to catch slashing */)
if err != nil {
log.WithError(err).Error("Failed to retrieve blocks stream")
return
@@ -53,7 +53,7 @@ func (bs *Service) ReceiveBlocks(ctx context.Context) {
log.WithError(err).Error("Could not restart beacon connection")
return
}
stream, err = bs.beaconClient.StreamBlocks(ctx, &ptypes.Empty{})
stream, err = bs.beaconClient.StreamBlocks(ctx, &ethpb.StreamBlocksRequest{} /* Prefers unverified block to catch slashing */)
if err != nil {
log.WithError(err).Error("Could not restart block stream")
return

View File

@@ -28,7 +28,7 @@ func TestService_ReceiveBlocks(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
client.EXPECT().StreamBlocks(
gomock.Any(),
&ptypes.Empty{},
&ethpb.StreamBlocksRequest{},
).Return(stream, nil)
stream.EXPECT().Context().Return(ctx).AnyTimes()
stream.EXPECT().Recv().Return(