Fix Invalid SSZ in Block Roots RPC Request (#6405)

* fix block roots error
* remove comment
* fix
* Merge branch 'master' of https://github.com/prysmaticlabs/geth-sharding into fixBlockRootsError
* change back
* add stream deadlines
* Merge branch 'master' into fixBlockRootsError
* add todos
* Merge branch 'fixBlockRootsError' of https://github.com/prysmaticlabs/geth-sharding into fixBlockRootsError
* Update beacon-chain/sync/rpc_beacon_blocks_by_root.go

Co-authored-by: Raul Jordan <raul@prysmaticlabs.com>
* gofmt
* fix test
* fix failing test
* Merge branch 'master' of https://github.com/prysmaticlabs/geth-sharding into fixBlockRootsError
* fix all
* Merge branch 'master' into fixBlockRootsError
* Merge branch 'master' into fixBlockRootsError
* Merge refs/heads/master into fixBlockRootsError
This commit is contained in:
Nishant Das
2020-06-27 01:05:04 +08:00
committed by GitHub
parent ab89053597
commit f1c0b2e0da
8 changed files with 126 additions and 30 deletions

View File

@@ -109,7 +109,19 @@ func (e SszNetworkEncoder) doDecode(b []byte, to interface{}) error {
if v, ok := to.(fastssz.Unmarshaler); ok {
return v.UnmarshalSSZ(b)
}
return ssz.Unmarshal(b, to)
err := ssz.Unmarshal(b, to)
if err != nil {
// Check if we are unmarshalling block roots
// and then lop off the 4 byte offset and try
// unmarshalling again. This is temporary to
// avoid too much disruption to onyx nodes.
// TODO(#6408)
if _, ok := to.(*[][32]byte); ok {
return ssz.Unmarshal(b[4:], to)
}
return err
}
return nil
}
// DecodeGossip decodes the bytes to the protobuf gossip message provided.

View File

@@ -160,6 +160,8 @@ go_test(
"@com_github_libp2p_go_libp2p_core//protocol:go_default_library",
"@com_github_libp2p_go_libp2p_pubsub//:go_default_library",
"@com_github_libp2p_go_libp2p_pubsub//pb:go_default_library",
"@com_github_protolambda_zssz//:go_default_library",
"@com_github_protolambda_zssz//types:go_default_library",
"@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library",
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
"@com_github_prysmaticlabs_go_ssz//:go_default_library",

View File

@@ -3,6 +3,7 @@ package sync
import (
"context"
"encoding/hex"
"io"
"sync"
pubsub "github.com/libp2p/go-libp2p-pubsub"
@@ -144,10 +145,12 @@ func (s *Service) processPendingAtts(ctx context.Context) error {
}
}
req := [][]byte{bRoot[:]}
if err := s.sendRecentBeaconBlocksRequest(ctx, req, pid); err != nil {
traceutil.AnnotateError(span, err)
log.Errorf("Could not send recent block request: %v", err)
req := [][32]byte{bRoot}
if err := s.sendRecentBeaconBlocksRequest(ctx, req, pid); err != nil && err == io.EOF {
if err = s.sendRecentBeaconBlocksRequestFallback(ctx, req, pid); err != nil {
traceutil.AnnotateError(span, err)
log.Errorf("Could not send recent block request: %v", err)
}
}
}
}

View File

@@ -77,7 +77,7 @@ func (s *Service) processPendingBlocks(ctx context.Context) error {
"currentSlot": b.Block.Slot,
"parentRoot": hex.EncodeToString(bytesutil.Trunc(b.Block.ParentRoot)),
}).Info("Requesting parent block")
req := [][]byte{b.Block.ParentRoot}
req := [][32]byte{bytesutil.ToBytes32(b.Block.ParentRoot)}
// Start with a random peer to query, but choose the first peer in our unsorted list that claims to
// have a head slot newer than the block slot we are requesting.
@@ -94,8 +94,10 @@ func (s *Service) processPendingBlocks(ctx context.Context) error {
}
if err := s.sendRecentBeaconBlocksRequest(ctx, req, pid); err != nil {
traceutil.AnnotateError(span, err)
log.Errorf("Could not send recent block request: %v", err)
if err = s.sendRecentBeaconBlocksRequestFallback(ctx, req, pid); err != nil {
traceutil.AnnotateError(span, err)
log.Errorf("Could not send recent block request: %v", err)
}
}
span.End()
continue

View File

@@ -49,7 +49,7 @@ func (s *Service) registerRPCHandlers() {
)
s.registerRPC(
p2p.RPCBlocksByRootTopic,
&pb.BeaconBlocksByRootRequest{},
[][32]byte{},
s.beaconBlocksRootRPCHandler,
)
s.registerRPC(

View File

@@ -11,17 +11,63 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/beacon-chain/state/stateutil"
pbp2p "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/params"
)
// sendRecentBeaconBlocksRequest sends a recent beacon blocks request to a peer to get
// those corresponding blocks from that peer.
func (s *Service) sendRecentBeaconBlocksRequest(ctx context.Context, blockRoots [][]byte, id peer.ID) error {
func (s *Service) sendRecentBeaconBlocksRequest(ctx context.Context, blockRoots [][32]byte, id peer.ID) error {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
req := &pbp2p.BeaconBlocksByRootRequest{BlockRoots: blockRoots}
stream, err := s.p2p.Send(ctx, blockRoots, p2p.RPCBlocksByRootTopic, id)
if err != nil {
return err
}
defer func() {
if err := stream.Reset(); err != nil {
log.WithError(err).Errorf("Failed to reset stream with protocol %s", stream.Protocol())
}
}()
for i := 0; i < len(blockRoots); i++ {
blk, err := ReadChunkedBlock(stream, s.p2p)
// Return error until #6408 is resolved.
if err == io.EOF {
return err
}
// Exit if peer sends more than max request blocks.
if uint64(i) >= params.BeaconNetworkConfig().MaxRequestBlocks {
break
}
if err != nil {
log.WithError(err).Error("Unable to retrieve block from stream")
return err
}
blkRoot, err := stateutil.BlockRoot(blk.Block)
if err != nil {
return err
}
s.pendingQueueLock.Lock()
s.slotToPendingBlocks[blk.Block.Slot] = blk
s.seenPendingBlocks[blkRoot] = true
s.pendingQueueLock.Unlock()
}
return nil
}
// Deprecated: sendRecentBeaconBlocksRequestFallback sends a recent beacon blocks request to a peer to get
// those corresponding blocks from that peer. This is a method implemented so that we are eventually
// backward compatible with old Onyx nodes.
// TODO(#6408)
func (s *Service) sendRecentBeaconBlocksRequestFallback(ctx context.Context, blockRoots [][32]byte, id peer.ID) error {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
req := &pbp2p.BeaconBlocksByRootRequest{}
for _, root := range blockRoots {
req.BlockRoots = append(req.BlockRoots, root[:])
}
stream, err := s.p2p.Send(ctx, req, p2p.RPCBlocksByRootTopic, id)
if err != nil {
return err
@@ -70,11 +116,11 @@ func (s *Service) beaconBlocksRootRPCHandler(ctx context.Context, msg interface{
SetRPCStreamDeadlines(stream)
log := log.WithField("handler", "beacon_blocks_by_root")
req, ok := msg.(*pbp2p.BeaconBlocksByRootRequest)
blockRoots, ok := msg.([][32]byte)
if !ok {
return errors.New("message is not type BeaconBlocksByRootRequest")
}
if len(req.BlockRoots) == 0 {
if len(blockRoots) == 0 {
resp, err := s.generateErrorResponse(responseCodeInvalidRequest, "no block roots provided in request")
if err != nil {
log.WithError(err).Error("Failed to generate a response error")
@@ -86,7 +132,7 @@ func (s *Service) beaconBlocksRootRPCHandler(ctx context.Context, msg interface{
return errors.New("no block roots provided")
}
if int64(len(req.BlockRoots)) > s.blocksRateLimiter.Remaining(stream.Conn().RemotePeer().String()) {
if int64(len(blockRoots)) > s.blocksRateLimiter.Remaining(stream.Conn().RemotePeer().String()) {
s.p2p.Peers().IncrementBadResponses(stream.Conn().RemotePeer())
if s.p2p.Peers().IsBad(stream.Conn().RemotePeer()) {
log.Debug("Disconnecting bad peer")
@@ -107,7 +153,7 @@ func (s *Service) beaconBlocksRootRPCHandler(ctx context.Context, msg interface{
return errors.New(rateLimitedError)
}
if uint64(len(req.BlockRoots)) > params.BeaconNetworkConfig().MaxRequestBlocks {
if uint64(len(blockRoots)) > params.BeaconNetworkConfig().MaxRequestBlocks {
resp, err := s.generateErrorResponse(responseCodeInvalidRequest, "requested more than the max block limit")
if err != nil {
log.WithError(err).Error("Failed to generate a response error")
@@ -119,10 +165,10 @@ func (s *Service) beaconBlocksRootRPCHandler(ctx context.Context, msg interface{
return errors.New("requested more than the max block limit")
}
s.blocksRateLimiter.Add(stream.Conn().RemotePeer().String(), int64(len(req.BlockRoots)))
s.blocksRateLimiter.Add(stream.Conn().RemotePeer().String(), int64(len(blockRoots)))
for _, root := range req.BlockRoots {
blk, err := s.db.Block(ctx, bytesutil.ToBytes32(root))
for _, root := range blockRoots {
blk, err := s.db.Block(ctx, root)
if err != nil {
log.WithError(err).Error("Failed to fetch block")
resp, err := s.generateErrorResponse(responseCodeServerError, genericError)

View File

@@ -1,6 +1,7 @@
package sync
import (
"bytes"
"context"
"reflect"
"sync"
@@ -10,6 +11,8 @@ import (
"github.com/kevinms/leakybucket-go"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/protocol"
"github.com/protolambda/zssz"
"github.com/protolambda/zssz/types"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/go-ssz"
mock "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing"
@@ -17,7 +20,6 @@ import (
db "github.com/prysmaticlabs/prysm/beacon-chain/db/testing"
p2ptest "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing"
"github.com/prysmaticlabs/prysm/beacon-chain/state/stateutil"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/testutil"
)
@@ -31,7 +33,7 @@ func TestRecentBeaconBlocksRPCHandler_ReturnsBlocks(t *testing.T) {
}
d, _ := db.SetupDB(t)
var blkRoots [][]byte
var blkRoots [][32]byte
// Populate the database with blocks that would match the request.
for i := 1; i < 11; i++ {
blk := &ethpb.BeaconBlock{
@@ -44,7 +46,7 @@ func TestRecentBeaconBlocksRPCHandler_ReturnsBlocks(t *testing.T) {
if err := d.SaveBlock(context.Background(), &ethpb.SignedBeaconBlock{Block: blk}); err != nil {
t.Fatal(err)
}
blkRoots = append(blkRoots, root[:])
blkRoots = append(blkRoots, root)
}
r := &Service{p2p: p1, db: d, blocksRateLimiter: leakybucket.NewCollector(10000, 10000, false)}
@@ -70,8 +72,7 @@ func TestRecentBeaconBlocksRPCHandler_ReturnsBlocks(t *testing.T) {
if err != nil {
t.Fatal(err)
}
req := &pb.BeaconBlocksByRootRequest{BlockRoots: blkRoots}
err = r.beaconBlocksRootRPCHandler(context.Background(), req, stream1)
err = r.beaconBlocksRootRPCHandler(context.Background(), blkRoots, stream1)
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
@@ -112,7 +113,7 @@ func TestRecentBeaconBlocks_RPCRequestSent(t *testing.T) {
Root: blockBRoot[:],
}
expectedRoots := [][]byte{blockBRoot[:], blockARoot[:]}
expectedRoots := [][32]byte{blockBRoot, blockARoot}
r := &Service{
p2p: p1,
@@ -133,12 +134,12 @@ func TestRecentBeaconBlocks_RPCRequestSent(t *testing.T) {
wg.Add(1)
p2.BHost.SetStreamHandler(pcl, func(stream network.Stream) {
defer wg.Done()
out := &pb.BeaconBlocksByRootRequest{BlockRoots: [][]byte{}}
if err := p2.Encoding().DecodeWithLength(stream, out); err != nil {
out := [][32]byte{}
if err := p2.Encoding().DecodeWithLength(stream, &out); err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(out.BlockRoots, expectedRoots) {
t.Fatalf("Did not receive expected message. Got %+v wanted %+v", out.BlockRoots, expectedRoots)
if !reflect.DeepEqual(out, expectedRoots) {
t.Fatalf("Did not receive expected message. Got %+v wanted %+v", out, expectedRoots)
}
response := []*ethpb.SignedBeaconBlock{blockB, blockA}
for _, blk := range response {
@@ -161,3 +162,33 @@ func TestRecentBeaconBlocks_RPCRequestSent(t *testing.T) {
t.Fatal("Did not receive stream within 1 sec")
}
}
type testList [][32]byte
func (*testList) Limit() uint64 {
return 2 << 10
}
func TestSSZCompatibility(t *testing.T) {
rootA := [32]byte{'a'}
rootB := [32]byte{'B'}
rootC := [32]byte{'C'}
list := testList{rootA, rootB, rootC}
writer := bytes.NewBuffer([]byte{})
sszType, err := types.SSZFactory(reflect.TypeOf(list))
if err != nil {
t.Error(err)
}
n, err := zssz.Encode(writer, list, sszType)
if err != nil {
t.Error(err)
}
encodedPart := writer.Bytes()[:n]
fastSSZ, err := ssz.Marshal(list)
if err != nil {
t.Error(err)
}
if !bytes.Equal(fastSSZ, encodedPart) {
t.Errorf("Wanted the same result as ZSSZ of %#x but got %#X", encodedPart, fastSSZ)
}
}

View File

@@ -17,7 +17,7 @@ message BeaconBlocksByRangeRequest {
uint64 count = 2;
uint64 step = 3;
}
// Deprecated: Will eventually be removed is only kept around for backward compatibility.
message BeaconBlocksByRootRequest {
repeated bytes block_roots = 1 [(gogoproto.moretags) = "ssz-size:\"?,32\" ssz-max:\"1024\""];
}