Beaconblock over wire (#3436)

This commit is contained in:
shayzluf
2019-09-10 19:54:14 +05:30
committed by terence tsao
parent 3708a8f476
commit 1edeb8ec4c
12 changed files with 52 additions and 44 deletions

View File

@@ -28,6 +28,7 @@ go_test(
],
embed = [":go_default_library"],
deps = [
"//proto/eth/v1alpha1:go_default_library",
"//proto/testing:go_default_library",
"@com_github_gogo_protobuf//proto:go_default_library",
],

View File

@@ -2,8 +2,6 @@ package encoder
import (
"io"
"github.com/gogo/protobuf/proto"
)
// Defines the different encoding formats
@@ -14,14 +12,16 @@ const (
// NetworkEncoding represents an encoder compatible with Ethereum 2.0 p2p.
type NetworkEncoding interface {
// Decodes to the provided message.
Decode([]byte, proto.Message) error
// DecodeWithLength a bytes from a reader with a varint length prefix.
DecodeWithLength(io.Reader, proto.Message) error
// Encode an arbitrary message to the provided writer.
Encode(io.Writer, proto.Message) (int, error)
// EncodeWithLength an arbitrary message to the provided writer with a varint length prefix.
EncodeWithLength(io.Writer, proto.Message) (int, error)
// Decodes to the provided message. The interface must be a pointer to the decoding destination.
Decode([]byte, interface{}) error
// DecodeWithLength a bytes from a reader with a varint length prefix. The interface must be a pointer to the
// decoding destination.
DecodeWithLength(io.Reader, interface{}) error
// Encode an arbitrary message to the provided writer. The interface must be a pointer object to encode.
Encode(io.Writer, interface{}) (int, error)
// EncodeWithLength an arbitrary message to the provided writer with a varint length prefix. The interface must be
// a pointer object to encode.
EncodeWithLength(io.Writer, interface{}) (int, error)
// ProtocolSuffix returns the last part of the protocol ID to indicate the encoding scheme.
ProtocolSuffix() string
}

View File

@@ -16,7 +16,7 @@ type SszNetworkEncoder struct {
UseSnappyCompression bool
}
func (e SszNetworkEncoder) doEncode(msg proto.Message) ([]byte, error) {
func (e SszNetworkEncoder) doEncode(msg interface{}) ([]byte, error) {
b, err := ssz.Marshal(msg)
if err != nil {
return nil, err
@@ -28,7 +28,7 @@ func (e SszNetworkEncoder) doEncode(msg proto.Message) ([]byte, error) {
}
// Encode the proto message to the io.Writer.
func (e SszNetworkEncoder) Encode(w io.Writer, msg proto.Message) (int, error) {
func (e SszNetworkEncoder) Encode(w io.Writer, msg interface{}) (int, error) {
if msg == nil {
return 0, nil
}
@@ -42,7 +42,7 @@ func (e SszNetworkEncoder) Encode(w io.Writer, msg proto.Message) (int, error) {
// EncodeWithLength the proto message to the io.Writer. This encoding prefixes the byte slice with a protobuf varint
// to indicate the size of the message.
func (e SszNetworkEncoder) EncodeWithLength(w io.Writer, msg proto.Message) (int, error) {
func (e SszNetworkEncoder) EncodeWithLength(w io.Writer, msg interface{}) (int, error) {
if msg == nil {
return 0, nil
}
@@ -55,7 +55,7 @@ func (e SszNetworkEncoder) EncodeWithLength(w io.Writer, msg proto.Message) (int
}
// Decode the bytes to the protobuf message provided.
func (e SszNetworkEncoder) Decode(b []byte, to proto.Message) error {
func (e SszNetworkEncoder) Decode(b []byte, to interface{}) error {
if e.UseSnappyCompression {
var err error
b, err = snappy.Decode(nil /*dst*/, b)
@@ -68,7 +68,7 @@ func (e SszNetworkEncoder) Decode(b []byte, to proto.Message) error {
}
// DecodeWithLength the bytes from io.Reader to the protobuf message provided.
func (e SszNetworkEncoder) DecodeWithLength(r io.Reader, to proto.Message) error {
func (e SszNetworkEncoder) DecodeWithLength(r io.Reader, to interface{}) error {
msgLen, err := readVarint(r)
if err != nil {
return err
@@ -85,7 +85,6 @@ func (e SszNetworkEncoder) DecodeWithLength(r io.Reader, to proto.Message) error
return err
}
}
return ssz.Unmarshal(b, to)
}

View File

@@ -6,6 +6,7 @@ import (
"github.com/gogo/protobuf/proto"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/encoder"
ethpb "github.com/prysmaticlabs/prysm/proto/eth/v1alpha1"
testpb "github.com/prysmaticlabs/prysm/proto/testing"
)
@@ -60,3 +61,13 @@ func testRoundTripWithLength(t *testing.T, e *encoder.SszNetworkEncoder) {
t.Error("Decoded message is not the same as original")
}
}
// Regression test to see that a block array response received from Sigma Prime's lighthouse would decode.
func TestLighthouseBeaconBlockResponse(t *testing.T) {
b := []byte{4, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 28, 71, 156, 79, 199, 27, 222, 126, 43, 250, 217, 225, 182, 66, 10, 239, 42, 82, 185, 124, 196, 228, 234, 124, 248, 85, 153, 182, 92, 139, 53, 220, 172, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 224, 0, 0, 0, 224, 0, 0, 0, 224, 0, 0, 0, 224, 0, 0, 0, 224, 0, 0, 0, 224, 0, 0, 0}
decoded := make([]ethpb.BeaconBlock, 0)
e := &encoder.SszNetworkEncoder{UseSnappyCompression: false}
if err := e.Decode(b, &decoded); err != nil {
t.Fatal(err)
}
}

View File

@@ -104,7 +104,7 @@ func (s *InitialSync) Start() {
var last *eth.BeaconBlock
for headSlot := s.chain.HeadSlot(); headSlot < slotsSinceGenesis(genesis); {
req := &pb.BeaconBlocksRequest{
HeadSlot: headSlot,
HeadSlot: headSlot + 1,
HeadBlockRoot: s.chain.HeadRoot(),
Count: 64,
Step: 1,
@@ -127,14 +127,13 @@ func (s *InitialSync) Start() {
panic(errMsg.ErrorMessage)
}
resp := &pb.BeaconBlocksResponse{}
if err := s.p2p.Encoding().DecodeWithLength(strm, resp); err != nil {
panic(err)
resp := make([]*eth.BeaconBlock, 0)
if err := s.p2p.Encoding().DecodeWithLength(strm, &resp); err != nil {
log.Error(err)
continue
}
log.Infof("Received %d blocks", len(resp.Blocks))
for _, blk := range resp.Blocks {
for _, blk := range resp {
if blk.Slot <= headSlot {
continue
}

View File

@@ -9,6 +9,7 @@ import (
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/beacon-chain/db/filters"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
ethpb "github.com/prysmaticlabs/prysm/proto/eth/v1alpha1"
)
// beaconBlocksRPCHandler looks up the request blocks from the database from a given start block.
@@ -49,11 +50,11 @@ func (r *RegularSync) beaconBlocksRPCHandler(ctx context.Context, msg proto.Mess
}
return err
}
ret := &pb.BeaconBlocksResponse{}
ret := []*ethpb.BeaconBlock{}
for _, blk := range blks {
if (blk.Slot-startSlot)%m.Step == 0 {
ret.Blocks = append(ret.Blocks, blk)
ret = append(ret, blk)
}
}

View File

@@ -46,14 +46,14 @@ func TestBeaconBlocksRPCHandler_ReturnsBlocks(t *testing.T) {
p2.Host.SetStreamHandler(pcl, func(stream network.Stream) {
defer wg.Done()
expectSuccess(t, r, stream)
res := &pb.BeaconBlocksResponse{}
if err := r.p2p.Encoding().DecodeWithLength(stream, res); err != nil {
res := make([]ethpb.BeaconBlock, 0)
if err := r.p2p.Encoding().DecodeWithLength(stream, &res); err != nil {
t.Error(err)
}
if uint64(len(res.Blocks)) != req.Count {
t.Errorf("Received only %d blocks, expected %d", len(res.Blocks), req.Count)
if uint64(len(res)) != req.Count {
t.Errorf("Received only %d blocks, expected %d", len(res), req.Count)
}
for _, blk := range res.Blocks {
for _, blk := range res {
if (blk.Slot-req.HeadSlot)%req.Step != 0 {
t.Errorf("Received unexpected block slot %d", blk.Slot)
}

View File

@@ -8,6 +8,7 @@ import (
libp2pcore "github.com/libp2p/go-libp2p-core"
"github.com/pkg/errors"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
eth "github.com/prysmaticlabs/prysm/proto/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
)
@@ -32,7 +33,7 @@ func (r *RegularSync) recentBeaconBlocksRPCHandler(ctx context.Context, msg prot
}
return errors.New("no block roots provided")
}
ret := &pb.BeaconBlocksResponse{}
ret := make([]*eth.BeaconBlock, 0)
for _, root := range blockRoots {
blk, err := r.db.Block(ctx, bytesutil.ToBytes32(root))
if err != nil {
@@ -48,7 +49,7 @@ func (r *RegularSync) recentBeaconBlocksRPCHandler(ctx context.Context, msg prot
return err
}
// if block returned is nil, it appends nil to the slice
ret.Blocks = append(ret.Blocks, blk)
ret = append(ret, blk)
}
if _, err := stream.Write([]byte{responseCodeSuccess}); err != nil {

View File

@@ -53,14 +53,14 @@ func TestRecentBeaconBlocksRPCHandler_ReturnsBlocks(t *testing.T) {
p2.Host.SetStreamHandler(pcl, func(stream network.Stream) {
defer wg.Done()
expectSuccess(t, r, stream)
res := &pb.BeaconBlocksResponse{}
if err := r.p2p.Encoding().DecodeWithLength(stream, res); err != nil {
res := make([]*ethpb.BeaconBlock, 0)
if err := r.p2p.Encoding().DecodeWithLength(stream, &res); err != nil {
t.Error(err)
}
if len(res.Blocks) != len(req.BlockRoots) {
t.Errorf("Received only %d blocks, expected %d", len(res.Blocks), len(req.BlockRoots))
if len(res) != len(req.BlockRoots) {
t.Errorf("Received only %d blocks, expected %d", len(res), len(req.BlockRoots))
}
for i, blk := range res.Blocks {
for i, blk := range res {
if blk.Slot != uint64(i+1) {
t.Errorf("Received unexpected block slot %d but wanted %d", blk.Slot, i+1)
}

View File

@@ -36,10 +36,6 @@ message BeaconBlocksRequest {
uint64 step = 4;
}
message BeaconBlocksResponse {
repeated ethereum.eth.v1alpha1.BeaconBlock blocks = 1;
}
message RecentBeaconBlocksRequest {
repeated bytes block_roots = 1 [(gogoproto.moretags) = "ssz-size:\"?,32\""];
}

View File

@@ -271,4 +271,4 @@ func SplitCommaSeparated(arr []string) []string {
result = append(result, strings.Split(val, ",")...)
}
return result
}
}

View File

@@ -11,8 +11,8 @@ import (
"log"
"os"
"gopkg.in/yaml.v2"
keygen "github.com/prysmaticlabs/prysm/tools/unencrypted-keys-gen"
"gopkg.in/yaml.v2"
)
// KeyPair with hex encoded data.
@@ -48,7 +48,7 @@ func main() {
}
out.Keys = append(out.Keys, &keygen.UnencryptedKeys{
ValidatorKey: pk,
ValidatorKey: pk,
WithdrawalKey: pk,
})
}