P2P implement message send (#3278)

* return a stream with send, for reading response

* gofmt

* added sender impl

* fix imports
This commit is contained in:
Preston Van Loon
2019-08-22 19:02:46 -04:00
committed by GitHub
parent 8f01b76366
commit b59b3ec09c
16 changed files with 157 additions and 25 deletions

View File

@@ -3,6 +3,7 @@ test --test_verbose_timeout_warnings
# Only build test targets when running bazel test //...
test --build_tests_only
test --test_output=errors
# Fix for rules_docker. See: https://github.com/bazelbuild/rules_docker/issues/842
build --host_force_python=PY2

View File

@@ -67,6 +67,7 @@ go_test(
"//shared/testutil:go_default_library",
"@com_github_ethereum_go_ethereum//common:go_default_library",
"@com_github_gogo_protobuf//proto:go_default_library",
"@com_github_libp2p_go_libp2p_core//network:go_default_library",
"@com_github_libp2p_go_libp2p_peer//:go_default_library",
"@com_github_prysmaticlabs_go_ssz//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",

View File

@@ -53,6 +53,7 @@ go_test(
"//shared/hashutil:go_default_library",
"//shared/testutil:go_default_library",
"@com_github_gogo_protobuf//proto:go_default_library",
"@com_github_libp2p_go_libp2p_core//network:go_default_library",
"@com_github_libp2p_go_libp2p_peer//:go_default_library",
"@com_github_prysmaticlabs_go_ssz//:go_default_library",
"@com_github_sirupsen_logrus//hooks/test:go_default_library",

View File

@@ -2,6 +2,7 @@ package initialsync
import (
"context"
"github.com/libp2p/go-libp2p-core/network"
"testing"
"time"
@@ -29,8 +30,8 @@ func (mp *mockP2P) Subscribe(msg proto.Message, channel chan p2p.Message) event.
func (mp *mockP2P) Broadcast(ctx context.Context, msg proto.Message) {}
func (mp *mockP2P) Send(ctx context.Context, msg proto.Message, peerID peer.ID) error {
return nil
func (mp *mockP2P) Send(ctx context.Context, msg proto.Message, peerID peer.ID) (network.Stream, error) {
return nil, nil
}
func (mp *mockP2P) Reputation(_ peer.ID, _ int) {

View File

@@ -79,7 +79,7 @@ func (s *InitialSync) requestBatchedBlocks(ctx context.Context, FinalizedRoot []
"finalizedBlkRoot": fmt.Sprintf("%#x", bytesutil.Trunc(FinalizedRoot[:])),
"headBlkRoot": fmt.Sprintf("%#x", bytesutil.Trunc(canonicalRoot[:]))},
).Debug("Requesting batched blocks")
if err := s.p2p.Send(ctx, &pb.BatchedBeaconBlockRequest{
if _, err := s.p2p.Send(ctx, &pb.BatchedBeaconBlockRequest{
FinalizedRoot: FinalizedRoot,
CanonicalRoot: canonicalRoot,
}, peer); err != nil {

View File

@@ -97,11 +97,14 @@ func (s *InitialSync) processState(msg p2p.Message, chainHead *pb.ChainHeadRespo
}
// requestStateFromPeer requests for the canonical state, finalized state, and justified state from a peer.
// DEPRECATED: Use github.com/prysmaticlabs/prysm/sync/initial-sync
func (s *InitialSync) requestStateFromPeer(ctx context.Context, lastFinalizedRoot [32]byte, peer peer.ID) error {
ctx, span := trace.StartSpan(ctx, "beacon-chain.sync.initial-sync.requestStateFromPeer")
defer span.End()
stateReq.Inc()
return s.p2p.Send(ctx, &pb.BeaconStateRequest{
_, err := s.p2p.Send(ctx, &pb.BeaconStateRequest{
FinalizedStateRootHash32S: lastFinalizedRoot[:],
}, peer)
return err
}

View File

@@ -52,7 +52,7 @@ func (rs *RegularSync) receiveBlockAnnounce(msg p2p.Message) error {
log.WithField("blockRoot", fmt.Sprintf("%#x", bytesutil.Trunc(h[:]))).Debug("Received incoming block root, requesting full block data from sender")
// Request the full block data from peer that sent the block hash.
if err := rs.p2p.Send(ctx, &pb.BeaconBlockRequest{Hash: h[:]}, msg.Peer); err != nil {
if _, err := rs.p2p.Send(ctx, &pb.BeaconBlockRequest{Hash: h[:]}, msg.Peer); err != nil {
log.Error(err)
return err
}

View File

@@ -315,7 +315,7 @@ func (rs *RegularSync) handleStateRequest(msg deprecatedp2p.Message) error {
FinalizedState: fState,
FinalizedBlock: finalizedBlk,
}
if err := rs.p2p.Send(ctx, resp, msg.Peer); err != nil {
if _, err := rs.p2p.Send(ctx, resp, msg.Peer); err != nil {
log.Error(err)
return err
}
@@ -372,7 +372,7 @@ func (rs *RegularSync) handleChainHeadRequest(msg deprecatedp2p.Message) error {
ctx, ChainHead := trace.StartSpan(ctx, "sendChainHead")
defer ChainHead.End()
defer sentChainHead.Inc()
if err := rs.p2p.Send(ctx, req, msg.Peer); err != nil {
if _, err := rs.p2p.Send(ctx, req, msg.Peer); err != nil {
log.Error(err)
return err
}
@@ -488,7 +488,7 @@ func (rs *RegularSync) handleBlockRequestByHash(msg deprecatedp2p.Message) error
}
defer sentBlocks.Inc()
if err := rs.p2p.Send(ctx, &pb.BeaconBlockResponse{
if _, err := rs.p2p.Send(ctx, &pb.BeaconBlockResponse{
Block: block,
}, msg.Peer); err != nil {
log.Error(err)
@@ -515,7 +515,7 @@ func (rs *RegularSync) handleBatchedBlockRequest(msg deprecatedp2p.Message) erro
log.WithField("peer", msg.Peer).Debug("Sending response for batch blocks")
defer sentBatchedBlocks.Inc()
if err := rs.p2p.Send(ctx, &pb.BatchedBeaconBlockResponse{
if _, err := rs.p2p.Send(ctx, &pb.BatchedBeaconBlockResponse{
BatchedBlocks: response,
}, msg.Peer); err != nil {
log.Error(err)

View File

@@ -3,6 +3,7 @@ package sync
import (
"context"
"fmt"
"github.com/libp2p/go-libp2p-core/network"
"io/ioutil"
"reflect"
"strconv"
@@ -44,9 +45,9 @@ func (mp *mockP2P) Broadcast(ctx context.Context, msg proto.Message) error {
return nil
}
func (mp *mockP2P) Send(ctx context.Context, msg proto.Message, peerID peer.ID) error {
func (mp *mockP2P) Send(ctx context.Context, msg proto.Message, peerID peer.ID) (network.Stream, error) {
mp.sentMsg = msg
return nil
return nil, nil
}
func (mp *mockP2P) Reputation(_ peer.ID, val int) {

View File

@@ -13,6 +13,7 @@ go_library(
"interfaces.go",
"log.go",
"options.go",
"rpc_topic_mappings.go",
"sender.go",
"service.go",
"utils.go",
@@ -52,6 +53,7 @@ go_test(
"discovery_test.go",
"options_test.go",
"parameter_test.go",
"sender_test.go",
"service_test.go",
],
embed = [":go_default_library"],
@@ -65,6 +67,7 @@ go_test(
"@com_github_gogo_protobuf//proto:go_default_library",
"@com_github_libp2p_go_libp2p//:go_default_library",
"@com_github_libp2p_go_libp2p_core//host:go_default_library",
"@com_github_libp2p_go_libp2p_core//network:go_default_library",
"@com_github_libp2p_go_libp2p_pubsub//:go_default_library",
"@com_github_multiformats_go_multiaddr//:go_default_library",
"@com_github_sirupsen_logrus//hooks/test:go_default_library",

View File

@@ -57,5 +57,5 @@ type HandshakeManager interface {
// Sender abstracts the sending functionality from libp2p.
type Sender interface {
Send(context.Context, proto.Message, peer.ID) error
Send(context.Context, proto.Message, peer.ID) (network.Stream, error)
}

View File

@@ -0,0 +1,28 @@
package p2p
import (
"reflect"
"github.com/gogo/protobuf/proto"
p2ppb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
)
// RPCTopicMappings represent the protocol ID to protobuf message type map for easy
// lookup. These mappings should be used for outbound sending only. Peers may respond
// with a different message type as defined by the p2p protocol.
var RPCTopicMappings = map[string]proto.Message{
"/eth2/beacon_chain/req/hello/1": &p2ppb.Hello{},
"/eth2/beacon_chain/req/goodbye/1": &p2ppb.Goodbye{},
"/eth2/beacon_chain/req/beacon_blocks/1": &p2ppb.BeaconBlocksRequest{},
"/eth2/beacon_chain/req/recent_beacon_blocks/1": &p2ppb.RecentBeaconBlocksRequest{},
}
// RPCTypeMapping is the inverse of RPCTopicMappings so that an arbitrary protobuf message
// can be mapped to a protocol ID string.
var RPCTypeMapping = make(map[reflect.Type]string)
func init() {
for k, v := range RPCTopicMappings {
RPCTypeMapping[reflect.TypeOf(v)] = k
}
}

View File

@@ -2,13 +2,37 @@ package p2p
import (
"context"
"reflect"
"time"
"github.com/gogo/protobuf/proto"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
)
// Send a message to a specific peer.
// TODO(3147): Implement.
func (s *Service) Send(ctx context.Context, message proto.Message, pid peer.ID) error {
return nil
// Send a message to a specific peer. The returned stream may be used for reading, but has been
// closed for writing.
func (s *Service) Send(ctx context.Context, message proto.Message, pid peer.ID) (network.Stream, error) {
topic := RPCTypeMapping[reflect.TypeOf(message)] + s.Encoding().ProtocolSuffix()
// TTFB_TIME (5s) + RESP_TIMEOUT (10s).
ctx, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel()
stream, err := s.host.NewStream(ctx, pid, protocol.ID(topic))
if err != nil {
return nil, err
}
if _, err := s.Encoding().Encode(stream, message); err != nil {
return nil, err
}
// Close stream for writing.
if err := stream.Close(); err != nil {
return nil, err
}
return stream, nil
}

View File

@@ -0,0 +1,69 @@
package p2p
import (
"context"
"reflect"
"sync"
"testing"
"time"
"github.com/gogo/protobuf/proto"
"github.com/libp2p/go-libp2p-core/network"
testp2p "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing"
testpb "github.com/prysmaticlabs/prysm/proto/testing"
"github.com/prysmaticlabs/prysm/shared/testutil"
)
func TestService_Send(t *testing.T) {
p1 := testp2p.NewTestP2P(t)
p2 := testp2p.NewTestP2P(t)
p1.Connect(p2)
svc := &Service{
host: p1.Host,
cfg: &Config{Encoding: "ssz"},
}
msg := &testpb.TestSimpleMessage{
Foo: []byte("hello"),
Bar: 55,
}
// Register testing topic.
RPCTypeMapping[reflect.TypeOf(msg)] = "/testing/1"
// Register external listener which will repeat the message back.
var wg sync.WaitGroup
wg.Add(1)
go func() {
p2.SetStreamHandler("/testing/1/ssz", func(stream network.Stream) {
rcvd := &testpb.TestSimpleMessage{}
if err := svc.Encoding().Decode(stream, rcvd); err != nil {
t.Fatal(err)
}
if _, err := svc.Encoding().Encode(stream, rcvd); err != nil {
t.Fatal(err)
}
if err := stream.Close(); err != nil {
t.Error(err)
}
wg.Done()
})
}()
stream, err := svc.Send(context.Background(), msg, p2.Host.ID())
if err != nil {
t.Fatal(err)
}
testutil.WaitTimeout(&wg, 1*time.Second)
rcvd := &testpb.TestSimpleMessage{}
if err := svc.Encoding().Decode(stream, rcvd); err != nil {
t.Fatal(err)
}
if !proto.Equal(rcvd, msg) {
t.Errorf("Expected identical message to be received. got %v want %v", rcvd, msg)
}
}

View File

@@ -140,9 +140,8 @@ func (p *TestP2P) AddHandshake(pid peer.ID, hello *pb.Hello) {
}
// Send a message to a specific peer.
func (p *TestP2P) Send(ctx context.Context, msg proto.Message, pid peer.ID) error {
// TODO(3147): add this.
return nil
func (p *TestP2P) Send(ctx context.Context, msg proto.Message, pid peer.ID) (network.Stream, error) {
return nil, nil
}
// Subscribe to some topic. Not implemented.

View File

@@ -16,6 +16,7 @@ import (
ds "github.com/ipfs/go-datastore"
dsync "github.com/ipfs/go-datastore/sync"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p-core/network"
host "github.com/libp2p/go-libp2p-host"
kaddht "github.com/libp2p/go-libp2p-kad-dht"
dhtopts "github.com/libp2p/go-libp2p-kad-dht/opts"
@@ -44,7 +45,7 @@ const maxMessageSize = 1 << 24
// Sender represents a struct that is able to relay information via p2p.
// Server implements this interface.
type Sender interface {
Send(ctx context.Context, msg proto.Message, peer peer.ID) error
Send(ctx context.Context, msg proto.Message, peer peer.ID) (network.Stream, error)
}
// Server is a placeholder for a p2p service. To be designed.
@@ -426,7 +427,7 @@ func (s *Server) Subscribe(msg proto.Message, channel chan Message) event.Subscr
// Send a message to a specific peer. If the peerID is set to p2p.AnyPeer, then
// this method will act as a broadcast.
func (s *Server) Send(ctx context.Context, msg proto.Message, peerID peer.ID) error {
func (s *Server) Send(ctx context.Context, msg proto.Message, peerID peer.ID) (network.Stream, error) {
isPeer := false
for _, p := range s.host.Network().Peers() {
if p == peerID {
@@ -437,7 +438,7 @@ func (s *Server) Send(ctx context.Context, msg proto.Message, peerID peer.ID) er
if peerID == AnyPeer || s.host.Network().Connectedness(peerID) == libp2pnet.CannotConnect || !isPeer {
s.Broadcast(ctx, msg)
return nil
return nil, nil
}
ctx, span := trace.StartSpan(ctx, "p2p.Send")
@@ -449,7 +450,7 @@ func (s *Server) Send(ctx context.Context, msg proto.Message, peerID peer.ID) er
pid := protocol.ID(prysmProtocolPrefix + "/" + topic)
stream, err := s.host.NewStream(ctx, peerID, pid)
if err != nil {
return err
return nil, err
}
defer stream.Close()
@@ -458,7 +459,7 @@ func (s *Server) Send(ctx context.Context, msg proto.Message, peerID peer.ID) er
b, err := proto.Marshal(msg)
if err != nil {
return err
return nil, err
}
envelope := &pb.Envelope{
@@ -467,7 +468,7 @@ func (s *Server) Send(ctx context.Context, msg proto.Message, peerID peer.ID) er
Timestamp: types.TimestampNow(),
}
return w.WriteMsg(envelope)
return nil, w.WriteMsg(envelope)
}
// Broadcast publishes a message to all localized peers using gossipsub.