mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-09 15:37:56 -05:00
@@ -25,6 +25,7 @@ go_library(
|
||||
"@com_github_ethereum_go_ethereum//common:go_default_library",
|
||||
"@com_github_ethereum_go_ethereum//ethclient:go_default_library",
|
||||
"@com_github_ethereum_go_ethereum//rpc:go_default_library",
|
||||
"@com_github_golang_protobuf//proto:go_default_library",
|
||||
"@com_github_sirupsen_logrus//:go_default_library",
|
||||
"@com_github_urfave_cli//:go_default_library",
|
||||
],
|
||||
|
||||
@@ -1,22 +1,23 @@
|
||||
package node
|
||||
|
||||
import (
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/prysmaticlabs/prysm/shared/p2p"
|
||||
|
||||
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
|
||||
)
|
||||
|
||||
var topicMappings = map[pb.Topic]interface{}{
|
||||
pb.Topic_BEACON_BLOCK_HASH_ANNOUNCE: pb.BeaconBlockHashAnnounce{},
|
||||
pb.Topic_BEACON_BLOCK_REQUEST: pb.BeaconBlockRequest{},
|
||||
pb.Topic_BEACON_BLOCK_REQUEST_BY_SLOT_NUMBER: pb.BeaconBlockRequestBySlotNumber{},
|
||||
pb.Topic_BEACON_BLOCK_RESPONSE: pb.BeaconBlockResponse{},
|
||||
pb.Topic_CRYSTALLIZED_STATE_HASH_ANNOUNCE: pb.CrystallizedStateHashAnnounce{},
|
||||
pb.Topic_CRYSTALLIZED_STATE_REQUEST: pb.CrystallizedStateRequest{},
|
||||
pb.Topic_CRYSTALLIZED_STATE_RESPONSE: pb.CrystallizedStateResponse{},
|
||||
pb.Topic_ACTIVE_STATE_HASH_ANNOUNCE: pb.ActiveStateHashAnnounce{},
|
||||
pb.Topic_ACTIVE_STATE_REQUEST: pb.ActiveStateRequest{},
|
||||
pb.Topic_ACTIVE_STATE_RESPONSE: pb.ActiveStateResponse{},
|
||||
var topicMappings = map[pb.Topic]proto.Message{
|
||||
pb.Topic_BEACON_BLOCK_HASH_ANNOUNCE: &pb.BeaconBlockHashAnnounce{},
|
||||
pb.Topic_BEACON_BLOCK_REQUEST: &pb.BeaconBlockRequest{},
|
||||
pb.Topic_BEACON_BLOCK_REQUEST_BY_SLOT_NUMBER: &pb.BeaconBlockRequestBySlotNumber{},
|
||||
pb.Topic_BEACON_BLOCK_RESPONSE: &pb.BeaconBlockResponse{},
|
||||
pb.Topic_CRYSTALLIZED_STATE_HASH_ANNOUNCE: &pb.CrystallizedStateHashAnnounce{},
|
||||
pb.Topic_CRYSTALLIZED_STATE_REQUEST: &pb.CrystallizedStateRequest{},
|
||||
pb.Topic_CRYSTALLIZED_STATE_RESPONSE: &pb.CrystallizedStateResponse{},
|
||||
pb.Topic_ACTIVE_STATE_HASH_ANNOUNCE: &pb.ActiveStateHashAnnounce{},
|
||||
pb.Topic_ACTIVE_STATE_REQUEST: &pb.ActiveStateRequest{},
|
||||
pb.Topic_ACTIVE_STATE_RESPONSE: &pb.ActiveStateResponse{},
|
||||
}
|
||||
|
||||
func configureP2P() (*p2p.Server, error) {
|
||||
|
||||
@@ -28,6 +28,7 @@ go_test(
|
||||
"//shared/testutil:go_default_library",
|
||||
"@com_github_ethereum_go_ethereum//common:go_default_library",
|
||||
"@com_github_ethereum_go_ethereum//event:go_default_library",
|
||||
"@com_github_golang_protobuf//proto:go_default_library",
|
||||
"@com_github_sirupsen_logrus//:go_default_library",
|
||||
"@com_github_sirupsen_logrus//hooks/test:go_default_library",
|
||||
],
|
||||
|
||||
@@ -117,7 +117,7 @@ func (sim *Simulator) lastSimulatedSessionBlock() (*types.Block, error) {
|
||||
}
|
||||
|
||||
func (sim *Simulator) run(delayChan <-chan time.Time, done <-chan struct{}) {
|
||||
blockReqSub := sim.p2p.Subscribe(pb.BeaconBlockRequest{}, sim.blockRequestChan)
|
||||
blockReqSub := sim.p2p.Subscribe(&pb.BeaconBlockRequest{}, sim.blockRequestChan)
|
||||
defer blockReqSub.Unsubscribe()
|
||||
|
||||
// Check if we saved a simulated block in the DB from a previous session.
|
||||
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/event"
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/types"
|
||||
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
|
||||
"github.com/prysmaticlabs/prysm/shared/database"
|
||||
@@ -25,13 +26,13 @@ func init() {
|
||||
|
||||
type mockP2P struct{}
|
||||
|
||||
func (mp *mockP2P) Subscribe(msg interface{}, channel interface{}) event.Subscription {
|
||||
func (mp *mockP2P) Subscribe(msg proto.Message, channel chan p2p.Message) event.Subscription {
|
||||
return new(event.Feed).Subscribe(channel)
|
||||
}
|
||||
|
||||
func (mp *mockP2P) Broadcast(msg interface{}) {}
|
||||
func (mp *mockP2P) Broadcast(msg proto.Message) {}
|
||||
|
||||
func (mp *mockP2P) Send(msg interface{}, peer p2p.Peer) {}
|
||||
func (mp *mockP2P) Send(msg proto.Message, peer p2p.Peer) {}
|
||||
|
||||
type mockPOWChainService struct{}
|
||||
|
||||
|
||||
@@ -25,6 +25,7 @@ go_test(
|
||||
"//shared/p2p:go_default_library",
|
||||
"//shared/testutil:go_default_library",
|
||||
"@com_github_ethereum_go_ethereum//event:go_default_library",
|
||||
"@com_github_golang_protobuf//proto:go_default_library",
|
||||
"@com_github_sirupsen_logrus//:go_default_library",
|
||||
"@com_github_sirupsen_logrus//hooks/test:go_default_library",
|
||||
"@org_golang_x_crypto//blake2b:go_default_library",
|
||||
|
||||
@@ -23,6 +23,7 @@ go_test(
|
||||
"//shared/p2p:go_default_library",
|
||||
"//shared/testutil:go_default_library",
|
||||
"@com_github_ethereum_go_ethereum//event:go_default_library",
|
||||
"@com_github_golang_protobuf//proto:go_default_library",
|
||||
"@com_github_sirupsen_logrus//hooks/test:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
@@ -128,8 +128,8 @@ func (s *InitialSync) Stop() error {
|
||||
// delayChan is explicitly passed into this function to facilitate tests that don't require a timeout.
|
||||
// It is assumed that the goroutine `run` is only called once per instance.
|
||||
func (s *InitialSync) run(delaychan <-chan time.Time) {
|
||||
blockSub := s.p2p.Subscribe(pb.BeaconBlockResponse{}, s.blockBuf)
|
||||
crystallizedStateSub := s.p2p.Subscribe(pb.CrystallizedStateResponse{}, s.crystallizedStateBuf)
|
||||
blockSub := s.p2p.Subscribe(&pb.BeaconBlockResponse{}, s.blockBuf)
|
||||
crystallizedStateSub := s.p2p.Subscribe(&pb.CrystallizedStateResponse{}, s.crystallizedStateBuf)
|
||||
defer func() {
|
||||
blockSub.Unsubscribe()
|
||||
crystallizedStateSub.Unsubscribe()
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/event"
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/types"
|
||||
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
|
||||
"github.com/prysmaticlabs/prysm/shared/p2p"
|
||||
@@ -16,13 +17,13 @@ import (
|
||||
type mockP2P struct {
|
||||
}
|
||||
|
||||
func (mp *mockP2P) Subscribe(msg interface{}, channel interface{}) event.Subscription {
|
||||
func (mp *mockP2P) Subscribe(msg proto.Message, channel chan p2p.Message) event.Subscription {
|
||||
return new(event.Feed).Subscribe(channel)
|
||||
}
|
||||
|
||||
func (mp *mockP2P) Broadcast(msg interface{}) {}
|
||||
func (mp *mockP2P) Broadcast(msg proto.Message) {}
|
||||
|
||||
func (mp *mockP2P) Send(msg interface{}, peer p2p.Peer) {
|
||||
func (mp *mockP2P) Send(msg proto.Message, peer p2p.Peer) {
|
||||
}
|
||||
|
||||
type mockChainService struct {
|
||||
|
||||
@@ -123,9 +123,9 @@ func (ss *Service) ReceiveBlockHash(data *pb.BeaconBlockHashAnnounce, peer p2p.P
|
||||
|
||||
// run handles incoming block sync.
|
||||
func (ss *Service) run() {
|
||||
announceBlockHashSub := ss.p2p.Subscribe(pb.BeaconBlockHashAnnounce{}, ss.announceBlockHashBuf)
|
||||
blockSub := ss.p2p.Subscribe(pb.BeaconBlockResponse{}, ss.blockBuf)
|
||||
blockRequestSub := ss.p2p.Subscribe(pb.BeaconBlockRequestBySlotNumber{}, ss.blockRequestBySlot)
|
||||
announceBlockHashSub := ss.p2p.Subscribe(&pb.BeaconBlockHashAnnounce{}, ss.announceBlockHashBuf)
|
||||
blockSub := ss.p2p.Subscribe(&pb.BeaconBlockResponse{}, ss.blockBuf)
|
||||
blockRequestSub := ss.p2p.Subscribe(&pb.BeaconBlockRequestBySlotNumber{}, ss.blockRequestBySlot)
|
||||
|
||||
defer announceBlockHashSub.Unsubscribe()
|
||||
defer blockSub.Unsubscribe()
|
||||
|
||||
@@ -6,9 +6,9 @@ import (
|
||||
"io/ioutil"
|
||||
"testing"
|
||||
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/types"
|
||||
|
||||
"github.com/ethereum/go-ethereum/event"
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/types"
|
||||
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
|
||||
"github.com/prysmaticlabs/prysm/shared/p2p"
|
||||
"github.com/prysmaticlabs/prysm/shared/testutil"
|
||||
@@ -25,13 +25,13 @@ func init() {
|
||||
type mockP2P struct {
|
||||
}
|
||||
|
||||
func (mp *mockP2P) Subscribe(msg interface{}, channel interface{}) event.Subscription {
|
||||
func (mp *mockP2P) Subscribe(msg proto.Message, channel chan p2p.Message) event.Subscription {
|
||||
return new(event.Feed).Subscribe(channel)
|
||||
}
|
||||
|
||||
func (mp *mockP2P) Broadcast(msg interface{}) {}
|
||||
func (mp *mockP2P) Broadcast(msg proto.Message) {}
|
||||
|
||||
func (mp *mockP2P) Send(msg interface{}, peer p2p.Peer) {
|
||||
func (mp *mockP2P) Send(msg proto.Message, peer p2p.Peer) {
|
||||
}
|
||||
|
||||
type mockChainService struct {
|
||||
|
||||
@@ -7,14 +7,15 @@ import (
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
gethTypes "github.com/ethereum/go-ethereum/core/types"
|
||||
"github.com/ethereum/go-ethereum/event"
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/prysmaticlabs/prysm/shared/p2p"
|
||||
)
|
||||
|
||||
// P2P defines a struct that can subscribe to feeds, request data, and broadcast data.
|
||||
type P2P interface {
|
||||
Subscribe(msg interface{}, channel interface{}) event.Subscription
|
||||
Send(msg interface{}, peer p2p.Peer)
|
||||
Broadcast(msg interface{})
|
||||
Subscribe(msg proto.Message, channel chan p2p.Message) event.Subscription
|
||||
Send(msg proto.Message, peer p2p.Peer)
|
||||
Broadcast(msg proto.Message)
|
||||
}
|
||||
|
||||
// CanonicalEventAnnouncer defines a struct that pushes canonical blocks
|
||||
|
||||
@@ -33,6 +33,7 @@ go_test(
|
||||
srcs = [
|
||||
"feed_example_test.go",
|
||||
"feed_test.go",
|
||||
"message_test.go",
|
||||
"options_test.go",
|
||||
"register_topic_example_test.go",
|
||||
"service_test.go",
|
||||
@@ -44,6 +45,7 @@ go_test(
|
||||
"//shared:go_default_library",
|
||||
"@com_github_ethereum_go_ethereum//event:go_default_library",
|
||||
"@com_github_golang_protobuf//proto:go_default_library",
|
||||
"@com_github_ipfs_go_log//:go_default_library",
|
||||
"@com_github_libp2p_go_floodsub//:go_default_library",
|
||||
"@com_github_libp2p_go_floodsub//pb:go_default_library",
|
||||
"@com_github_libp2p_go_libp2p_blankhost//:go_default_library",
|
||||
|
||||
@@ -1,9 +1,8 @@
|
||||
package p2p
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
|
||||
"github.com/ethereum/go-ethereum/event"
|
||||
"github.com/golang/protobuf/proto"
|
||||
)
|
||||
|
||||
// Feed is a one to many subscription feed of the argument type.
|
||||
@@ -17,23 +16,15 @@ import (
|
||||
// contains information about the sender, aka the peer, and the message payload
|
||||
// itself.
|
||||
//
|
||||
// feed, err := ps.Feed(MyMessage{})
|
||||
// feed, err := ps.Feed(&pb.MyMessage{})
|
||||
// ch := make(chan p2p.Message, 100) // Choose a reasonable buffer size!
|
||||
// sub := feed.Subscribe(ch)
|
||||
//
|
||||
// // Wait until my message comes from a peer.
|
||||
// msg := <- ch
|
||||
// fmt.Printf("Message received: %v", msg.Data)
|
||||
func (s *Server) Feed(msg interface{}) *event.Feed {
|
||||
var t reflect.Type
|
||||
|
||||
// Support passing reflect.Type as the msg.
|
||||
switch msg.(type) {
|
||||
case reflect.Type:
|
||||
t = msg.(reflect.Type)
|
||||
default:
|
||||
t = reflect.TypeOf(msg)
|
||||
}
|
||||
func (s *Server) Feed(msg proto.Message) *event.Feed {
|
||||
t := messageType(msg)
|
||||
|
||||
s.mutex.Lock()
|
||||
defer s.mutex.Unlock()
|
||||
|
||||
@@ -14,7 +14,7 @@ func ExampleServer_Feed() {
|
||||
}
|
||||
|
||||
// Let's wait for a puzzle from our peers then try to solve it.
|
||||
feed := s.Feed(pb.Puzzle{})
|
||||
feed := s.Feed(&pb.Puzzle{})
|
||||
|
||||
ch := make(chan Message, 5) // Small buffer size. I don't expect many puzzles.
|
||||
sub := feed.Subscribe(ch)
|
||||
|
||||
@@ -2,26 +2,26 @@ package p2p
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/ethereum/go-ethereum/event"
|
||||
"github.com/golang/protobuf/proto"
|
||||
testpb "github.com/prysmaticlabs/prysm/proto/testing"
|
||||
)
|
||||
|
||||
func TestFeed_ReturnsSameFeed(t *testing.T) {
|
||||
tests := []struct {
|
||||
a interface{}
|
||||
b interface{}
|
||||
a proto.Message
|
||||
b proto.Message
|
||||
want bool
|
||||
}{
|
||||
// Equality tests
|
||||
{a: 1, b: 2, want: true},
|
||||
{a: 'a', b: 'b', want: true},
|
||||
{a: struct{ c int }{c: 1}, b: struct{ c int }{c: 2}, want: true},
|
||||
{a: struct{ c string }{c: "a"}, b: struct{ c string }{c: "b"}, want: true},
|
||||
{a: reflect.TypeOf(struct{ c int }{c: 1}), b: struct{ c int }{c: 2}, want: true},
|
||||
{a: &testpb.TestMessage{}, b: &testpb.TestMessage{}, want: true},
|
||||
{a: &testpb.Puzzle{}, b: &testpb.Puzzle{}, want: true},
|
||||
// Inequality tests
|
||||
{a: 1, b: '2', want: false},
|
||||
{a: 'a', b: 1, want: false},
|
||||
{a: struct{ c int }{c: 1}, b: struct{ c int64 }{c: 2}, want: false},
|
||||
{a: struct{ c string }{c: "a"}, b: struct{ c float64 }{c: 3.4}, want: false},
|
||||
{a: &testpb.TestMessage{}, b: &testpb.Puzzle{}, want: false},
|
||||
{a: &testpb.Puzzle{}, b: &testpb.TestMessage{}, want: false},
|
||||
}
|
||||
|
||||
s, _ := NewServer()
|
||||
@@ -37,12 +37,12 @@ func TestFeed_ReturnsSameFeed(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestFeed_ConcurrentWrite(t *testing.T) {
|
||||
s, err := NewServer()
|
||||
if err != nil {
|
||||
t.Fatalf("could not create server %v", err)
|
||||
s := Server{
|
||||
feeds: make(map[reflect.Type]*event.Feed),
|
||||
mutex: &sync.Mutex{},
|
||||
}
|
||||
|
||||
for i := 0; i < 5; i++ {
|
||||
go s.Feed("a")
|
||||
go s.Feed(&testpb.TestMessage{})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
package p2p
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
)
|
||||
|
||||
@@ -11,3 +13,11 @@ type Message struct {
|
||||
// Data can be any type of message found in sharding/p2p/proto package.
|
||||
Data proto.Message
|
||||
}
|
||||
|
||||
// messageType returns the underlying struct type for a given proto.message.
|
||||
func messageType(msg proto.Message) reflect.Type {
|
||||
// proto.Message is a pointer and we need to dereference the pointer
|
||||
// and take the type of the original struct. Otherwise reflect.TypeOf will
|
||||
// create a new value of type **pb.BeaconBlockHashAnnounce for example.
|
||||
return reflect.ValueOf(msg).Elem().Type()
|
||||
}
|
||||
|
||||
35
shared/p2p/message_test.go
Normal file
35
shared/p2p/message_test.go
Normal file
@@ -0,0 +1,35 @@
|
||||
package p2p
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
testpb "github.com/prysmaticlabs/prysm/proto/testing"
|
||||
)
|
||||
|
||||
func TestMessageType(t *testing.T) {
|
||||
tests := []struct {
|
||||
msg proto.Message
|
||||
expected reflect.Type
|
||||
}{
|
||||
{
|
||||
msg: &testpb.TestMessage{},
|
||||
expected: reflect.TypeOf(testpb.TestMessage{}),
|
||||
},
|
||||
{
|
||||
msg: &testpb.Puzzle{},
|
||||
expected: reflect.TypeOf(testpb.Puzzle{}),
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(fmt.Sprintf("%v", tt.expected), func(t *testing.T) {
|
||||
got := messageType(tt.msg)
|
||||
if got != tt.expected {
|
||||
t.Errorf("Wanted %v but got %v", tt.expected, got)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -2,6 +2,7 @@ package p2p
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"sync"
|
||||
|
||||
@@ -81,12 +82,12 @@ func (s *Server) Stop() error {
|
||||
//
|
||||
// The topics can originate from multiple sources. In other words, messages on
|
||||
// TopicA may come from direct peer communication or a pub/sub channel.
|
||||
func (s *Server) RegisterTopic(topic string, message interface{}, adapters ...Adapter) {
|
||||
msgType := reflect.TypeOf(message)
|
||||
func (s *Server) RegisterTopic(topic string, message proto.Message, adapters ...Adapter) {
|
||||
log.WithFields(logrus.Fields{
|
||||
"topic": topic,
|
||||
}).Debug("Subscribing to topic")
|
||||
|
||||
msgType := messageType(message)
|
||||
s.topicMapping[msgType] = topic
|
||||
|
||||
sub, err := s.gsub.Subscribe(topic)
|
||||
@@ -94,7 +95,7 @@ func (s *Server) RegisterTopic(topic string, message interface{}, adapters ...Ad
|
||||
log.Errorf("Failed to subscribe to topic: %v", err)
|
||||
return
|
||||
}
|
||||
feed := s.Feed(msgType)
|
||||
feed := s.Feed(message)
|
||||
|
||||
// Reverse adapter order
|
||||
for i := len(adapters)/2 - 1; i >= 0; i-- {
|
||||
@@ -130,15 +131,15 @@ func (s *Server) RegisterTopic(topic string, message interface{}, adapters ...Ad
|
||||
h(s.ctx, pMsg)
|
||||
}
|
||||
}()
|
||||
|
||||
}
|
||||
|
||||
func (s *Server) emit(feed *event.Feed, msg *floodsub.Message, msgType reflect.Type) {
|
||||
d, ok := reflect.New(msgType).Interface().(proto.Message)
|
||||
if !ok {
|
||||
log.Error("Received message is not a protobuf message")
|
||||
log.Errorf("Received message is not a protobuf message: %s", msgType)
|
||||
return
|
||||
}
|
||||
|
||||
if err := proto.Unmarshal(msg.Data, d); err != nil {
|
||||
log.Errorf("Failed to decode data: %v", err)
|
||||
return
|
||||
@@ -147,17 +148,17 @@ func (s *Server) emit(feed *event.Feed, msg *floodsub.Message, msgType reflect.T
|
||||
i := feed.Send(Message{Data: d})
|
||||
log.WithFields(logrus.Fields{
|
||||
"numSubs": i,
|
||||
}).Debug("Sent a request to subs")
|
||||
|
||||
"msgType": fmt.Sprintf("%T", d),
|
||||
}).Debug("Emit p2p message to feed subscribers")
|
||||
}
|
||||
|
||||
// Subscribe returns a subscription to a feed of msg's Type and adds the channels to the feed.
|
||||
func (s *Server) Subscribe(msg interface{}, channel interface{}) event.Subscription {
|
||||
func (s *Server) Subscribe(msg proto.Message, channel chan Message) event.Subscription {
|
||||
return s.Feed(msg).Subscribe(channel)
|
||||
}
|
||||
|
||||
// Send a message to a specific peer.
|
||||
func (s *Server) Send(msg interface{}, peer Peer) {
|
||||
func (s *Server) Send(msg proto.Message, peer Peer) {
|
||||
// TODO
|
||||
// https://github.com/prysmaticlabs/prysm/issues/175
|
||||
|
||||
@@ -170,12 +171,12 @@ func (s *Server) Send(msg interface{}, peer Peer) {
|
||||
}
|
||||
|
||||
// Broadcast a message to the world.
|
||||
func (s *Server) Broadcast(msg interface{}) {
|
||||
func (s *Server) Broadcast(msg proto.Message) {
|
||||
// TODO: https://github.com/prysmaticlabs/prysm/issues/176
|
||||
topic := s.topicMapping[reflect.TypeOf(msg)]
|
||||
topic := s.topicMapping[messageType(msg)]
|
||||
log.WithFields(logrus.Fields{
|
||||
"topic": topic,
|
||||
}).Debugf("Broadcasting msg %s", msg)
|
||||
}).Debugf("Broadcasting msg %+v", msg)
|
||||
|
||||
if topic == "" {
|
||||
log.Warnf("Topic is unknown for message type %T. %v", msg, msg)
|
||||
|
||||
@@ -2,7 +2,6 @@ package p2p
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io/ioutil"
|
||||
"reflect"
|
||||
"strings"
|
||||
"sync"
|
||||
@@ -11,6 +10,7 @@ import (
|
||||
|
||||
"github.com/ethereum/go-ethereum/event"
|
||||
"github.com/golang/protobuf/proto"
|
||||
ipfslog "github.com/ipfs/go-log"
|
||||
floodsub "github.com/libp2p/go-floodsub"
|
||||
floodsubPb "github.com/libp2p/go-floodsub/pb"
|
||||
bhost "github.com/libp2p/go-libp2p-blankhost"
|
||||
@@ -27,7 +27,7 @@ var _ = shared.Service(&Server{})
|
||||
|
||||
func init() {
|
||||
logrus.SetLevel(logrus.DebugLevel)
|
||||
logrus.SetOutput(ioutil.Discard)
|
||||
ipfslog.SetDebugLogging()
|
||||
}
|
||||
|
||||
func TestBroadcast(t *testing.T) {
|
||||
@@ -46,7 +46,7 @@ func TestEmitFailsNonProtobuf(t *testing.T) {
|
||||
s, _ := NewServer()
|
||||
hook := logTest.NewGlobal()
|
||||
s.emit(nil /*feed*/, nil /*msg*/, reflect.TypeOf(""))
|
||||
want := "Received message is not a protobuf message"
|
||||
want := "Received message is not a protobuf message: string"
|
||||
if hook.LastEntry().Message != want {
|
||||
t.Errorf("Expected log to contain %s. Got = %s", want, hook.LastEntry().Message)
|
||||
}
|
||||
@@ -87,7 +87,7 @@ func TestSubscribeToTopic(t *testing.T) {
|
||||
topicMapping: make(map[reflect.Type]string),
|
||||
}
|
||||
|
||||
feed := s.Feed(shardpb.CollationBodyRequest{})
|
||||
feed := s.Feed(&shardpb.CollationBodyRequest{})
|
||||
ch := make(chan Message)
|
||||
sub := feed.Subscribe(ch)
|
||||
defer sub.Unsubscribe()
|
||||
@@ -115,7 +115,7 @@ func TestSubscribe(t *testing.T) {
|
||||
}
|
||||
|
||||
ch := make(chan Message)
|
||||
sub := s.Subscribe(shardpb.CollationBodyRequest{}, ch)
|
||||
sub := s.Subscribe(&shardpb.CollationBodyRequest{}, ch)
|
||||
defer sub.Unsubscribe()
|
||||
|
||||
testSubscribe(ctx, t, s, gsub, ch)
|
||||
@@ -124,7 +124,7 @@ func TestSubscribe(t *testing.T) {
|
||||
func testSubscribe(ctx context.Context, t *testing.T, s Server, gsub *floodsub.PubSub, ch chan Message) {
|
||||
topic := shardpb.Topic_COLLATION_BODY_REQUEST
|
||||
|
||||
go s.RegisterTopic(topic.String(), shardpb.CollationBodyRequest{})
|
||||
s.RegisterTopic(topic.String(), &shardpb.CollationBodyRequest{})
|
||||
|
||||
// Short delay to let goroutine add subscription.
|
||||
time.Sleep(time.Millisecond * 10)
|
||||
@@ -165,14 +165,15 @@ func testSubscribe(ctx context.Context, t *testing.T, s Server, gsub *floodsub.P
|
||||
}
|
||||
|
||||
func TestRegisterTopic_WithoutAdapters(t *testing.T) {
|
||||
t.Skip("Currently failing to simulate incoming p2p messages. See github.com/prysmaticlabs/prysm/issues/488")
|
||||
s, err := NewServer()
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create new server: %v", err)
|
||||
}
|
||||
topic := "test_topic"
|
||||
testMessage := testpb.TestMessage{Foo: "bar"}
|
||||
testMessage := &testpb.TestMessage{Foo: "bar"}
|
||||
|
||||
s.RegisterTopic(topic, testpb.TestMessage{})
|
||||
s.RegisterTopic(topic, testMessage)
|
||||
|
||||
ch := make(chan Message)
|
||||
sub := s.Subscribe(testMessage, ch)
|
||||
@@ -181,10 +182,17 @@ func TestRegisterTopic_WithoutAdapters(t *testing.T) {
|
||||
wait := make(chan struct{})
|
||||
go func() {
|
||||
defer close(wait)
|
||||
<-ch
|
||||
msg := <-ch
|
||||
tmsg := msg.Data.(*testpb.TestMessage)
|
||||
if tmsg.Foo != "bar" {
|
||||
t.Errorf("Expected test message Foo: \"bar\". Got: %v", tmsg)
|
||||
}
|
||||
}()
|
||||
|
||||
if err := simulateIncomingMessage(t, s, topic, []byte{}); err != nil {
|
||||
b, _ := proto.Marshal(testMessage)
|
||||
_ = b
|
||||
|
||||
if err := simulateIncomingMessage(t, s, topic, b); err != nil {
|
||||
t.Errorf("Failed to send to topic %s", topic)
|
||||
}
|
||||
|
||||
@@ -196,13 +204,14 @@ func TestRegisterTopic_WithoutAdapters(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestRegisterTopic_WithAdapers(t *testing.T) {
|
||||
func TestRegisterTopic_WithAdapters(t *testing.T) {
|
||||
t.Skip("Currently failing to simulate incoming p2p messages. See github.com/prysmaticlabs/prysm/issues/488")
|
||||
s, err := NewServer()
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create new server: %v", err)
|
||||
}
|
||||
topic := "test_topic"
|
||||
testMessage := testpb.TestMessage{Foo: "bar"}
|
||||
testMessage := &testpb.TestMessage{Foo: "bar"}
|
||||
|
||||
i := 0
|
||||
var testAdapter Adapter = func(next Handler) Handler {
|
||||
@@ -220,7 +229,7 @@ func TestRegisterTopic_WithAdapers(t *testing.T) {
|
||||
testAdapter,
|
||||
}
|
||||
|
||||
s.RegisterTopic(topic, testpb.TestMessage{}, adapters...)
|
||||
s.RegisterTopic(topic, testMessage, adapters...)
|
||||
|
||||
ch := make(chan Message)
|
||||
sub := s.Subscribe(testMessage, ch)
|
||||
@@ -229,7 +238,11 @@ func TestRegisterTopic_WithAdapers(t *testing.T) {
|
||||
wait := make(chan struct{})
|
||||
go func() {
|
||||
defer close(wait)
|
||||
<-ch
|
||||
msg := <-ch
|
||||
tmsg := msg.Data.(*testpb.TestMessage)
|
||||
if tmsg.Foo != "bar" {
|
||||
t.Errorf("Expected test message Foo: \"bar\". Got: %v", tmsg)
|
||||
}
|
||||
}()
|
||||
|
||||
if err := simulateIncomingMessage(t, s, topic, []byte{}); err != nil {
|
||||
@@ -248,7 +261,8 @@ func TestRegisterTopic_WithAdapers(t *testing.T) {
|
||||
}
|
||||
|
||||
func simulateIncomingMessage(t *testing.T, s *Server, topic string, b []byte) error {
|
||||
ctx := context.Background()
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
h := bhost.NewBlankHost(swarmt.GenSwarm(t, ctx))
|
||||
|
||||
gsub, err := floodsub.NewFloodSub(ctx, h)
|
||||
|
||||
@@ -28,6 +28,7 @@ go_library(
|
||||
"//validator/rpcclient:go_default_library",
|
||||
"//validator/txpool:go_default_library",
|
||||
"//validator/types:go_default_library",
|
||||
"@com_github_golang_protobuf//proto:go_default_library",
|
||||
"@com_github_sirupsen_logrus//:go_default_library",
|
||||
"@com_github_urfave_cli//:go_default_library",
|
||||
],
|
||||
|
||||
@@ -1,15 +1,16 @@
|
||||
package node
|
||||
|
||||
import (
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/prysmaticlabs/prysm/shared/p2p"
|
||||
|
||||
pb "github.com/prysmaticlabs/prysm/proto/sharding/p2p/v1"
|
||||
)
|
||||
|
||||
var topicMappings = map[pb.Topic]interface{}{
|
||||
pb.Topic_COLLATION_BODY_REQUEST: pb.CollationBodyRequest{},
|
||||
pb.Topic_COLLATION_BODY_RESPONSE: pb.CollationBodyResponse{},
|
||||
pb.Topic_TRANSACTIONS: pb.Transaction{},
|
||||
var topicMappings = map[pb.Topic]proto.Message{
|
||||
pb.Topic_COLLATION_BODY_REQUEST: &pb.CollationBodyRequest{},
|
||||
pb.Topic_COLLATION_BODY_RESPONSE: &pb.CollationBodyResponse{},
|
||||
pb.Topic_TRANSACTIONS: &pb.Transaction{},
|
||||
}
|
||||
|
||||
func configureP2P() (*p2p.Server, error) {
|
||||
|
||||
Reference in New Issue
Block a user