p2p fixes / improvements (#1931)

* don't broadcast to yourself

* todo

* Also listen for incoming streams

* send to a specific peer

* add godoc comment

* handle EOF as a normal thing

* gazelle

* add test for subscribing via direct comm
This commit is contained in:
Preston Van Loon
2019-03-07 21:37:15 -05:00
committed by GitHub
parent 2f9de6f93c
commit 2e7b08f97e
15 changed files with 251 additions and 108 deletions

View File

@@ -50,6 +50,7 @@ go_test(
"//shared/params:go_default_library",
"//shared/testutil:go_default_library",
"@com_github_gogo_protobuf//proto:go_default_library",
"@com_github_libp2p_go_libp2p_peer//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@com_github_sirupsen_logrus//hooks/test:go_default_library",
],

View File

@@ -17,6 +17,7 @@ go_library(
"//shared/p2p:go_default_library",
"//shared/params:go_default_library",
"@com_github_gogo_protobuf//proto:go_default_library",
"@com_github_libp2p_go_libp2p_peer//:go_default_library",
"@com_github_prometheus_client_golang//prometheus:go_default_library",
"@com_github_prometheus_client_golang//prometheus/promauto:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
@@ -40,6 +41,7 @@ go_test(
"//shared/params:go_default_library",
"//shared/testutil:go_default_library",
"@com_github_gogo_protobuf//proto:go_default_library",
"@com_github_libp2p_go_libp2p_peer//:go_default_library",
"@com_github_sirupsen_logrus//hooks/test:go_default_library",
],
)

View File

@@ -18,6 +18,7 @@ import (
"time"
"github.com/gogo/protobuf/proto"
"github.com/libp2p/go-libp2p-peer"
"github.com/prysmaticlabs/prysm/beacon-chain/db"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
@@ -60,8 +61,8 @@ func DefaultConfig() *Config {
}
type p2pAPI interface {
p2p.Sender
Subscribe(msg proto.Message, channel chan p2p.Message) event.Subscription
Send(msg proto.Message, peer p2p.Peer)
Broadcast(msg proto.Message)
}
@@ -200,7 +201,7 @@ func (s *InitialSync) run(delayChan <-chan time.Time) {
}()
if s.reqState {
if err := s.requestStateFromPeer(s.ctx, s.stateRootOfHighestObservedSlot[:], p2p.Peer{}); err != nil {
if err := s.requestStateFromPeer(s.ctx, s.stateRootOfHighestObservedSlot[:], p2p.AnyPeer); err != nil {
log.Errorf("Could not request state from peer %v", err)
}
} else {
@@ -244,7 +245,7 @@ func (s *InitialSync) checkInMemoryBlocks() {
}
s.mutex.Lock()
if block, ok := s.inMemoryBlocks[s.currentSlot+1]; ok && s.currentSlot+1 <= s.highestObservedSlot {
s.processBlock(s.ctx, block, p2p.Peer{})
s.processBlock(s.ctx, block, p2p.AnyPeer)
}
s.mutex.Unlock()
}
@@ -255,7 +256,7 @@ func (s *InitialSync) checkInMemoryBlocks() {
// latest canonical head. If not, then it requests batched blocks up to the highest observed slot.
func (s *InitialSync) checkSyncStatus() bool {
if s.reqState {
if err := s.requestStateFromPeer(s.ctx, s.stateRootOfHighestObservedSlot[:], p2p.Peer{}); err != nil {
if err := s.requestStateFromPeer(s.ctx, s.stateRootOfHighestObservedSlot[:], p2p.AnyPeer); err != nil {
log.Errorf("Could not request state from peer %v", err)
}
return false
@@ -278,7 +279,7 @@ func (s *InitialSync) processBlockAnnounce(msg p2p.Message) {
recBlockAnnounce.Inc()
if s.reqState {
if err := s.requestStateFromPeer(ctx, s.stateRootOfHighestObservedSlot[:], p2p.Peer{}); err != nil {
if err := s.requestStateFromPeer(ctx, s.stateRootOfHighestObservedSlot[:], msg.Peer); err != nil {
log.Errorf("Could not request state from peer %v", err)
}
return
@@ -295,7 +296,7 @@ func (s *InitialSync) processBlockAnnounce(msg p2p.Message) {
// processBlock is the main method that validates each block which is received
// for initial sync. It checks if the blocks are valid and then will continue to
// process and save it into the db.
func (s *InitialSync) processBlock(ctx context.Context, block *pb.BeaconBlock, peer p2p.Peer) {
func (s *InitialSync) processBlock(ctx context.Context, block *pb.BeaconBlock, peerID peer.ID) {
ctx, span := trace.StartSpan(ctx, "beacon-chain.sync.initial-sync.processBlock")
defer span.End()
recBlock.Inc()
@@ -310,7 +311,7 @@ func (s *InitialSync) processBlock(ctx context.Context, block *pb.BeaconBlock, p
// requesting beacon state if there is no saved state.
if s.reqState {
if err := s.requestStateFromPeer(s.ctx, block.StateRootHash32, peer); err != nil {
if err := s.requestStateFromPeer(s.ctx, block.StateRootHash32, peerID); err != nil {
log.Errorf("Could not request beacon state from peer: %v", err)
}
return
@@ -387,13 +388,12 @@ func (s *InitialSync) processState(msg p2p.Message) {
// requestStateFromPeer sends a request to a peer for the corresponding state
// for a beacon block.
func (s *InitialSync) requestStateFromPeer(ctx context.Context, stateRoot []byte, peer p2p.Peer) error {
_, span := trace.StartSpan(ctx, "beacon-chain.sync.initial-sync.requestStateFromPeer")
func (s *InitialSync) requestStateFromPeer(ctx context.Context, stateRoot []byte, peerID peer.ID) error {
ctx, span := trace.StartSpan(ctx, "beacon-chain.sync.initial-sync.requestStateFromPeer")
defer span.End()
stateReq.Inc()
log.Debugf("Successfully processed incoming block with state hash: %#x", stateRoot)
s.p2p.Send(&pb.BeaconStateRequest{Hash: stateRoot}, peer)
return nil
return s.p2p.Send(ctx, &pb.BeaconStateRequest{Hash: stateRoot}, peerID)
}
// requestNextBlock broadcasts a request for a block with the entered slotnumber.
@@ -405,7 +405,7 @@ func (s *InitialSync) requestNextBlockBySlot(ctx context.Context, slotNumber uin
s.mutex.Lock()
defer s.mutex.Unlock()
if block, ok := s.inMemoryBlocks[slotNumber]; ok {
s.processBlock(ctx, block, p2p.Peer{})
s.processBlock(ctx, block, p2p.AnyPeer)
return
}
s.p2p.Broadcast(&pb.BeaconBlockRequestBySlotNumber{SlotNumber: slotNumber})

View File

@@ -7,6 +7,7 @@ import (
"time"
"github.com/gogo/protobuf/proto"
"github.com/libp2p/go-libp2p-peer"
b "github.com/prysmaticlabs/prysm/beacon-chain/core/blocks"
"github.com/prysmaticlabs/prysm/beacon-chain/core/state"
"github.com/prysmaticlabs/prysm/beacon-chain/db"
@@ -29,7 +30,8 @@ func (mp *mockP2P) Subscribe(msg proto.Message, channel chan p2p.Message) event.
func (mp *mockP2P) Broadcast(msg proto.Message) {}
func (mp *mockP2P) Send(msg proto.Message, peer p2p.Peer) {
func (mp *mockP2P) Send(ctx context.Context, msg proto.Message, peerID peer.ID) error {
return nil
}
type mockSyncService struct {
@@ -150,7 +152,7 @@ func TestSavingBlock_InSync(t *testing.T) {
}
return p2p.Message{
Peer: p2p.Peer{},
Peer: "",
Data: blockResponse,
Ctx: context.Background(),
}
@@ -166,7 +168,7 @@ func TestSavingBlock_InSync(t *testing.T) {
ss.blockBuf <- msg1
msg2 := p2p.Message{
Peer: p2p.Peer{},
Peer: "",
Data: incorrectStateResponse,
Ctx: context.Background(),
}
@@ -262,13 +264,13 @@ func TestDelayChan_OK(t *testing.T) {
}
msg1 := p2p.Message{
Peer: p2p.Peer{},
Peer: "",
Data: blockResponse,
Ctx: context.Background(),
}
msg2 := p2p.Message{
Peer: p2p.Peer{},
Peer: "",
Data: stateResponse,
Ctx: context.Background(),
}
@@ -355,7 +357,7 @@ func TestRequestBlocksBySlot_OK(t *testing.T) {
}
return p2p.Message{
Peer: p2p.Peer{},
Peer: "",
Data: blockResponse,
Ctx: context.Background(),
}, root

View File

@@ -39,8 +39,8 @@ type operationService interface {
}
type p2pAPI interface {
p2p.Sender
Subscribe(msg proto.Message, channel chan p2p.Message) event.Subscription
Send(msg proto.Message, peer p2p.Peer)
Broadcast(msg proto.Message)
}
@@ -227,6 +227,7 @@ func (rs *RegularSync) run() {
rs.broadcastCanonicalBlock(rs.ctx, block)
}
}
log.Info("Exiting regular sync run()")
}
// safelyHandleMessage will recover and log any panic that occurs from the
@@ -277,8 +278,10 @@ func (rs *RegularSync) receiveBlockAnnounce(msg p2p.Message) {
log.WithField("blockRoot", fmt.Sprintf("%#x", h)).Debug("Received incoming block root, requesting full block data from sender")
// Request the full block data from peer that sent the block hash.
_, sendBlockRequestSpan := trace.StartSpan(ctx, "beacon-chain.sync.sendBlockRequest")
rs.p2p.Send(&pb.BeaconBlockRequest{Hash: h[:]}, msg.Peer)
ctx, sendBlockRequestSpan := trace.StartSpan(ctx, "beacon-chain.sync.sendBlockRequest")
if err := rs.p2p.Send(ctx, &pb.BeaconBlockRequest{Hash: h[:]}, msg.Peer); err != nil {
log.Error(err)
}
sentBlockReq.Inc()
sendBlockRequestSpan.End()
}
@@ -376,12 +379,14 @@ func (rs *RegularSync) handleBlockRequestBySlot(msg p2p.Message) {
return
}
_, sendBlockSpan := trace.StartSpan(ctx, "sendBlock")
ctx, sendBlockSpan := trace.StartSpan(ctx, "sendBlock")
log.WithField("slotNumber",
fmt.Sprintf("%d", request.SlotNumber-params.BeaconConfig().GenesisSlot)).Debug("Sending requested block to peer")
rs.p2p.Send(&pb.BeaconBlockResponse{
if err := rs.p2p.Send(ctx, &pb.BeaconBlockResponse{
Block: block,
}, msg.Peer)
}, msg.Peer); err != nil {
log.Error(err)
}
sentBlocks.Inc()
sendBlockSpan.End()
}
@@ -409,9 +414,11 @@ func (rs *RegularSync) handleStateRequest(msg p2p.Message) {
log.Debugf("Requested state root is different from locally stored state root %#x", req.Hash)
return
}
_, sendStateSpan := trace.StartSpan(ctx, "beacon-chain.sync.sendState")
ctx, sendStateSpan := trace.StartSpan(ctx, "beacon-chain.sync.sendState")
log.WithField("beaconState", fmt.Sprintf("%#x", root)).Debug("Sending beacon state to peer")
rs.p2p.Send(&pb.BeaconStateResponse{BeaconState: state}, msg.Peer)
if err := rs.p2p.Send(ctx, &pb.BeaconStateResponse{BeaconState: state}, msg.Peer); err != nil {
log.Error(err)
}
sentState.Inc()
sendStateSpan.End()
}
@@ -442,10 +449,12 @@ func (rs *RegularSync) handleChainHeadRequest(msg p2p.Message) {
Hash: blockRoot[:],
Block: block,
}
_, ChainHead := trace.StartSpan(ctx, "sendChainHead")
rs.p2p.Send(req, msg.Peer)
ctx, ChainHead := trace.StartSpan(ctx, "sendChainHead")
defer ChainHead.End()
if err := rs.p2p.Send(ctx, req, msg.Peer); err != nil {
log.Error(err)
}
sentChainHead.Inc()
ChainHead.End()
}
// receiveAttestation accepts an broadcasted attestation from the p2p layer,
@@ -531,12 +540,14 @@ func (rs *RegularSync) handleBlockRequestByHash(msg p2p.Message) {
return
}
_, sendBlockSpan := trace.StartSpan(ctx, "sendBlock")
rs.p2p.Send(&pb.BeaconBlockResponse{
ctx, sendBlockSpan := trace.StartSpan(ctx, "sendBlock")
defer sendBlockSpan.End()
if err := rs.p2p.Send(ctx, &pb.BeaconBlockResponse{
Block: block,
}, msg.Peer)
}, msg.Peer); err != nil {
log.Error(err)
}
sentBlocks.Inc()
sendBlockSpan.End()
}
// handleBatchedBlockRequest receives p2p messages which consist of requests for batched blocks
@@ -588,13 +599,15 @@ func (rs *RegularSync) handleBatchedBlockRequest(msg p2p.Message) {
response = append(response, retBlock)
}
_, sendBatchedBlockSpan := trace.StartSpan(ctx, "sendBatchedBlocks")
ctx, sendBatchedBlockSpan := trace.StartSpan(ctx, "sendBatchedBlocks")
defer sendBatchedBlockSpan.End()
log.Debugf("Sending response for batch blocks to peer %v", msg.Peer)
rs.p2p.Send(&pb.BatchedBeaconBlockResponse{
if err := rs.p2p.Send(ctx, &pb.BatchedBeaconBlockResponse{
BatchedBlocks: response,
}, msg.Peer)
}, msg.Peer); err != nil {
log.Error(err)
}
sentBatchedBlocks.Inc()
sendBatchedBlockSpan.End()
}
func (rs *RegularSync) handleAttestationRequestByHash(msg p2p.Message) {
@@ -614,13 +627,15 @@ func (rs *RegularSync) handleAttestationRequestByHash(msg p2p.Message) {
return
}
_, sendAttestationSpan := trace.StartSpan(ctx, "sendAttestation")
ctx, sendAttestationSpan := trace.StartSpan(ctx, "sendAttestation")
defer sendAttestationSpan.End()
log.Debugf("Sending attestation %#x to peer %v", root, msg.Peer)
rs.p2p.Send(&pb.AttestationResponse{
if err := rs.p2p.Send(ctx, &pb.AttestationResponse{
Attestation: att,
}, msg.Peer)
}, msg.Peer); err != nil {
log.Error(err)
}
sentAttestation.Inc()
sendAttestationSpan.End()
}
func (rs *RegularSync) handleUnseenAttestationsRequest(msg p2p.Message) {
@@ -643,13 +658,15 @@ func (rs *RegularSync) handleUnseenAttestationsRequest(msg p2p.Message) {
return
}
_, sendAttestationsSpan := trace.StartSpan(ctx, "beacon-chain.sync.sendAttestation")
ctx, sendAttestationsSpan := trace.StartSpan(ctx, "beacon-chain.sync.sendAttestation")
defer sendAttestationsSpan.End()
log.Debugf("Sending response for batched unseen attestations to peer %v", msg.Peer)
rs.p2p.Send(&pb.UnseenAttestationResponse{
if err := rs.p2p.Send(ctx, &pb.UnseenAttestationResponse{
Attestations: atts,
}, msg.Peer)
}, msg.Peer); err != nil {
log.Error(err)
}
sentAttestation.Inc()
sendAttestationsSpan.End()
}
func (rs *RegularSync) broadcastCanonicalBlock(ctx context.Context, blk *pb.BeaconBlock) {

View File

@@ -9,6 +9,7 @@ import (
"time"
"github.com/gogo/protobuf/proto"
"github.com/libp2p/go-libp2p-peer"
"github.com/prysmaticlabs/prysm/beacon-chain/db"
"github.com/prysmaticlabs/prysm/beacon-chain/internal"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
@@ -36,7 +37,8 @@ func (mp *mockP2P) Subscribe(msg proto.Message, channel chan p2p.Message) event.
func (mp *mockP2P) Broadcast(msg proto.Message) {}
func (mp *mockP2P) Send(msg proto.Message, peer p2p.Peer) {
func (mp *mockP2P) Send(ctx context.Context, msg proto.Message, peerID peer.ID) error {
return nil
}
type mockChainService struct {
@@ -117,7 +119,7 @@ func TestProcessBlockRoot_OK(t *testing.T) {
msg := p2p.Message{
Ctx: context.Background(),
Peer: p2p.Peer{},
Peer: "",
Data: hashAnnounce,
}
@@ -198,7 +200,7 @@ func TestProcessBlock_OK(t *testing.T) {
msg := p2p.Message{
Ctx: context.Background(),
Peer: p2p.Peer{},
Peer: "",
Data: responseBlock,
}
@@ -277,7 +279,7 @@ func TestProcessBlock_MultipleBlocksProcessedOK(t *testing.T) {
msg1 := p2p.Message{
Ctx: context.Background(),
Peer: p2p.Peer{},
Peer: "",
Data: responseBlock1,
}
@@ -302,7 +304,7 @@ func TestProcessBlock_MultipleBlocksProcessedOK(t *testing.T) {
msg2 := p2p.Message{
Ctx: context.Background(),
Peer: p2p.Peer{},
Peer: "",
Data: responseBlock2,
}
@@ -380,7 +382,7 @@ func TestProcessBlock_MissingParentBlockRequestedOK(t *testing.T) {
msg1 := p2p.Message{
Ctx: context.Background(),
Peer: p2p.Peer{},
Peer: "",
Data: &pb.BeaconBlockResponse{
Block: block1,
},
@@ -388,7 +390,7 @@ func TestProcessBlock_MissingParentBlockRequestedOK(t *testing.T) {
msg2 := p2p.Message{
Ctx: context.Background(),
Peer: p2p.Peer{},
Peer: "",
Data: &pb.BeaconBlockResponse{
Block: block2,
},
@@ -396,7 +398,7 @@ func TestProcessBlock_MissingParentBlockRequestedOK(t *testing.T) {
msg3 := p2p.Message{
Ctx: context.Background(),
Peer: p2p.Peer{},
Peer: "",
Data: &pb.BeaconBlockResponse{
Block: block3,
},
@@ -440,7 +442,7 @@ func TestBlockRequest_InvalidMsg(t *testing.T) {
invalidmsg := p2p.Message{
Ctx: context.Background(),
Data: malformedRequest,
Peer: p2p.Peer{},
Peer: "",
}
ss.blockRequestBySlot <- invalidmsg
@@ -470,7 +472,7 @@ func TestBlockRequest_OK(t *testing.T) {
msg1 := p2p.Message{
Ctx: context.Background(),
Data: request1,
Peer: p2p.Peer{},
Peer: "",
}
ss.blockRequestBySlot <- msg1
@@ -515,7 +517,7 @@ func TestReceiveAttestation_OK(t *testing.T) {
msg1 := p2p.Message{
Ctx: context.Background(),
Data: request1,
Peer: p2p.Peer{},
Peer: "",
}
ss.attestationBuf <- msg1
@@ -558,7 +560,7 @@ func TestReceiveAttestation_OlderThanPrevEpoch(t *testing.T) {
msg1 := p2p.Message{
Ctx: context.Background(),
Data: request1,
Peer: p2p.Peer{},
Peer: "",
}
ss.attestationBuf <- msg1
@@ -597,7 +599,7 @@ func TestReceiveExitReq_OK(t *testing.T) {
msg1 := p2p.Message{
Ctx: context.Background(),
Data: request1,
Peer: p2p.Peer{},
Peer: "",
}
ss.exitBuf <- msg1
@@ -632,7 +634,7 @@ func TestHandleAttReq_HashNotFound(t *testing.T) {
msg := p2p.Message{
Ctx: context.Background(),
Data: req,
Peer: p2p.Peer{},
Peer: "",
}
ss.attestationReqByHashBuf <- msg
@@ -666,7 +668,7 @@ func TestHandleUnseenAttsReq_EmptyAttsPool(t *testing.T) {
msg := p2p.Message{
Ctx: context.Background(),
Data: req,
Peer: p2p.Peer{},
Peer: "",
}
ss.unseenAttestationsReqBuf <- msg
@@ -712,7 +714,7 @@ func TestHandleAttReq_Ok(t *testing.T) {
msg := p2p.Message{
Ctx: context.Background(),
Data: req,
Peer: p2p.Peer{},
Peer: "",
}
ss.attestationReqByHashBuf <- msg
@@ -753,7 +755,7 @@ func TestHandleUnseenAttsReq_Ok(t *testing.T) {
msg := p2p.Message{
Ctx: context.Background(),
Data: req,
Peer: p2p.Peer{},
Peer: "",
}
ss.unseenAttestationsReqBuf <- msg
@@ -790,7 +792,7 @@ func TestHandleStateReq_NOState(t *testing.T) {
msg1 := p2p.Message{
Ctx: context.Background(),
Data: request1,
Peer: p2p.Peer{},
Peer: "",
}
ss.stateRequestBuf <- msg1
@@ -837,7 +839,7 @@ func TestHandleStateReq_OK(t *testing.T) {
msg1 := p2p.Message{
Ctx: context.Background(),
Data: request1,
Peer: p2p.Peer{},
Peer: "",
}
ss.stateRequestBuf <- msg1

View File

@@ -8,6 +8,7 @@ import (
"time"
"github.com/gogo/protobuf/proto"
"github.com/libp2p/go-libp2p-peer"
"github.com/prysmaticlabs/prysm/beacon-chain/chaintest/backend"
"github.com/prysmaticlabs/prysm/beacon-chain/db"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
@@ -53,7 +54,7 @@ func (sim *simulatedP2P) Broadcast(msg proto.Message) {
feed.Send(p2p.Message{Ctx: sim.ctx, Data: msg})
}
func (sim *simulatedP2P) Send(msg proto.Message, peer p2p.Peer) {
func (sim *simulatedP2P) Send(ctx context.Context, msg proto.Message, peerID peer.ID) error {
sim.mutex.Lock()
defer sim.mutex.Unlock()
@@ -61,10 +62,11 @@ func (sim *simulatedP2P) Send(msg proto.Message, peer p2p.Peer) {
feed, ok := sim.subsChannels[protoType]
if !ok {
return
return nil
}
feed.Send(p2p.Message{Ctx: sim.ctx, Data: msg})
return nil
}
func setupSimBackendAndDB(t *testing.T) (*backend.SimulatedBackend, *db.BeaconDB, []*bls.SecretKey) {
@@ -188,13 +190,13 @@ func setUpUnSyncedService(simP2P *simulatedP2P, stateRoot [32]byte, t *testing.T
go ss.run()
for ss.Querier.currentHeadSlot == 0 {
simP2P.Send(&pb.ChainHeadResponse{
simP2P.Send(simP2P.ctx, &pb.ChainHeadResponse{
Slot: params.BeaconConfig().GenesisSlot + 12,
Hash: []byte{'t', 'e', 's', 't'},
Block: &pb.BeaconBlock{
StateRootHash32: stateRoot[:],
},
}, p2p.Peer{})
}, "")
}
return ss, beacondb

View File

@@ -36,10 +36,10 @@ spec:
topologyKey: failure-domain.beta.kubernetes.io/zone
containers:
- name: beacon-chain
image: gcr.io/prysmaticlabs/prysm/beacon-chain:latest
image: gcr.io/prysmaticlabs/prysm/beacon-chain@sha256:a8f6acb67d0eaf0c5f4a118bd65fb411cf11f71e02f9912893a4540daa678417
args:
- --web3provider=ws://public-rpc-nodes.pow.svc.cluster.local:8546
#- --verbosity=debug
#- --verbosity=debug
- --deposit-contract=$(DEPOSIT_CONTRACT_ADDRESS)
- --rpc-port=4000
- --monitoring-port=9090

View File

@@ -4,4 +4,4 @@ metadata:
name: beacon-config
namespace: beacon-chain
data:
DEPOSIT_CONTRACT_ADDRESS: "0x18229c4D0864106766F00C543e758741f63C90e6"
DEPOSIT_CONTRACT_ADDRESS: "0x8bDa9BAB2fa46D08D33b85491F14CE2B63B6820b"

View File

@@ -12,7 +12,6 @@ go_library(
"monitoring.go",
"options.go",
"p2p.go",
"peer.go",
"service.go",
],
importpath = "github.com/prysmaticlabs/prysm/shared/p2p",
@@ -20,6 +19,7 @@ go_library(
deps = [
"//shared/event:go_default_library",
"//shared/iputils:go_default_library",
"@com_github_gogo_protobuf//io:go_default_library",
"@com_github_gogo_protobuf//proto:go_default_library",
"@com_github_ipfs_go_datastore//:go_default_library",
"@com_github_ipfs_go_datastore//sync:go_default_library",
@@ -30,7 +30,10 @@ go_library(
"@com_github_libp2p_go_libp2p//p2p/host/routed:go_default_library",
"@com_github_libp2p_go_libp2p_host//:go_default_library",
"@com_github_libp2p_go_libp2p_kad_dht//:go_default_library",
"@com_github_libp2p_go_libp2p_net//:go_default_library",
"@com_github_libp2p_go_libp2p_peer//:go_default_library",
"@com_github_libp2p_go_libp2p_peerstore//:go_default_library",
"@com_github_libp2p_go_libp2p_protocol//:go_default_library",
"@com_github_libp2p_go_libp2p_pubsub//:go_default_library",
"@com_github_multiformats_go_multiaddr//:go_default_library",
"@com_github_prometheus_client_golang//prometheus:go_default_library",
@@ -58,10 +61,12 @@ go_test(
"//shared:go_default_library",
"//shared/p2p/mock:go_default_library",
"//shared/testutil:go_default_library",
"@com_github_gogo_protobuf//io:go_default_library",
"@com_github_gogo_protobuf//proto:go_default_library",
"@com_github_golang_mock//gomock:go_default_library",
"@com_github_ipfs_go_log//:go_default_library",
"@com_github_libp2p_go_libp2p_blankhost//:go_default_library",
"@com_github_libp2p_go_libp2p_peerstore//:go_default_library",
"@com_github_libp2p_go_libp2p_protocol//:go_default_library",
"@com_github_libp2p_go_libp2p_pubsub//:go_default_library",
"@com_github_libp2p_go_libp2p_swarm//testing:go_default_library",
"@com_github_multiformats_go_multiaddr//:go_default_library",

View File

@@ -5,6 +5,7 @@ import (
"reflect"
"github.com/gogo/protobuf/proto"
peer "github.com/libp2p/go-libp2p-peer"
)
// Message represents a message received from an external peer.
@@ -12,7 +13,7 @@ type Message struct {
// Ctx message context.
Ctx context.Context
// Peer represents the sender of the message.
Peer Peer
Peer peer.ID
// Data can be any type of message found in sharding/p2p/proto package.
Data proto.Message
}

View File

@@ -13,6 +13,11 @@
// Read more about gossipsub at https://github.com/vyzo/gerbil-simsub
package p2p
import "github.com/libp2p/go-libp2p-peer"
// AnyPeer represents a Peer ID alias for sending to any available peer(s).
const AnyPeer = peer.ID("AnyPeer")
// Use this file for interfaces only!
// Adapter is used to create middleware.

View File

@@ -1,6 +0,0 @@
package p2p
// Peer TODO(175): - Design and implement.
// See design doc: https://docs.google.com/document/d/1cthKuGPreOSQH96Ujt7sArcT-IRICk6b-QcdD0EnLsI/edit
// https://github.com/prysmaticlabs/prysm/issues/175
type Peer struct{}

View File

@@ -4,16 +4,21 @@ import (
"context"
"errors"
"fmt"
"io"
"net"
"reflect"
"sync"
ggio "github.com/gogo/protobuf/io"
"github.com/gogo/protobuf/proto"
ds "github.com/ipfs/go-datastore"
dsync "github.com/ipfs/go-datastore/sync"
libp2p "github.com/libp2p/go-libp2p"
host "github.com/libp2p/go-libp2p-host"
kaddht "github.com/libp2p/go-libp2p-kad-dht"
libp2pnet "github.com/libp2p/go-libp2p-net"
peer "github.com/libp2p/go-libp2p-peer"
protocol "github.com/libp2p/go-libp2p-protocol"
pubsub "github.com/libp2p/go-libp2p-pubsub"
rhost "github.com/libp2p/go-libp2p/p2p/host/routed"
"github.com/prysmaticlabs/prysm/shared/event"
@@ -22,10 +27,13 @@ import (
"go.opencensus.io/trace"
)
const prysmProtocolPrefix = "/prysm/0.0.0"
const maxMessageSize = 1 << 20
// Sender represents a struct that is able to relay information via p2p.
// Server implements this interface.
type Sender interface {
Send(msg interface{}, peer Peer)
Send(ctx context.Context, msg proto.Message, peer peer.ID) error
}
// Server is a placeholder for a p2p service. To be designed.
@@ -181,6 +189,39 @@ func (s *Server) RegisterTopic(topic string, message proto.Message, adapters ...
adapters[i], adapters[opp] = adapters[opp], adapters[i]
}
handler := func(msg proto.Message, peerID peer.ID) {
log.WithField("topic", topic).Debug("Processing incoming message")
var h Handler = func(pMsg Message) {
s.emit(pMsg, feed)
}
pMsg := Message{Ctx: s.ctx, Data: msg, Peer: peerID}
for _, adapter := range adapters {
h = adapter(h)
}
h(pMsg)
}
s.host.SetStreamHandler(protocol.ID(prysmProtocolPrefix+"/"+topic), func(stream libp2pnet.Stream) {
log.WithField("topic", topic).Debug("Received new stream")
r := ggio.NewDelimitedReader(stream, maxMessageSize)
msg := proto.Clone(message)
for {
err := r.ReadMsg(msg)
if err == io.EOF {
return // end of stream
}
if err != nil {
log.WithError(err).Error("Could not read message from stream")
return
}
handler(msg, stream.Conn().RemotePeer())
}
})
go func() {
defer sub.Cancel()
@@ -206,25 +247,20 @@ func (s *Server) RegisterTopic(topic string, message proto.Message, adapters ...
}
if err != nil {
log.Errorf("Failed to get next message: %v", err)
return
continue
}
d := message
if msg == nil || msg.GetFrom() == s.host.ID() {
continue
}
d := proto.Clone(message)
if err := proto.Unmarshal(msg.Data, d); err != nil {
log.WithError(err).Error("Failed to decode data")
continue
}
var h Handler = func(pMsg Message) {
s.emit(pMsg, feed)
}
pMsg := Message{Ctx: s.ctx, Data: d}
for _, adapter := range adapters {
h = adapter(h)
}
h(pMsg)
handler(d, msg.GetFrom())
}
}()
}
@@ -258,15 +294,26 @@ func (s *Server) Subscribe(msg proto.Message, channel chan Message) event.Subscr
return s.Feed(msg).Subscribe(channel)
}
// Send a message to a specific peer.
func (s *Server) Send(msg proto.Message, peer Peer) {
// TODO(#175)
// https://github.com/prysmaticlabs/prysm/issues/175
// Send a message to a specific peer. If the peerID is set to p2p.AnyPeer, then
// this method will act as a broadcast.
func (s *Server) Send(ctx context.Context, msg proto.Message, peerID peer.ID) error {
ctx, span := trace.StartSpan(ctx, "p2p.Send")
defer span.End()
if peerID == AnyPeer {
s.Broadcast(msg)
return nil
}
// TODO(#175): Remove debug log after send is implemented.
_ = peer
log.Debug("Broadcasting to everyone rather than sending a single peer")
s.Broadcast(msg)
topic := s.topicMapping[messageType(msg)]
pid := protocol.ID(prysmProtocolPrefix + "/" + topic)
stream, err := s.host.NewStream(ctx, peerID, pid)
if err != nil {
return err
}
w := ggio.NewDelimitedWriter(stream)
defer w.Close()
return w.WriteMsg(msg)
}
// Broadcast publishes a message to all localized peers using gossipsub.
@@ -301,7 +348,8 @@ func (s *Server) Broadcast(msg proto.Message) {
} else {
log.WithFields(logrus.Fields{
"topic": topic,
}).Debugf("Broadcasting msg %+v", msg)
"msg": msg,
}).Debug("Broadcasting msg")
}
if topic == "" {

View File

@@ -9,10 +9,12 @@ import (
"testing"
"time"
ggio "github.com/gogo/protobuf/io"
"github.com/gogo/protobuf/proto"
"github.com/golang/mock/gomock"
ipfslog "github.com/ipfs/go-log"
bhost "github.com/libp2p/go-libp2p-blankhost"
pstore "github.com/libp2p/go-libp2p-peerstore"
protocol "github.com/libp2p/go-libp2p-protocol"
pubsub "github.com/libp2p/go-libp2p-pubsub"
swarmt "github.com/libp2p/go-libp2p-swarm/testing"
shardpb "github.com/prysmaticlabs/prysm/proto/sharding/p2p/v1"
@@ -27,10 +29,10 @@ import (
// Ensure that server implements service.
var _ = shared.Service(&Server{})
var _ = Broadcaster(&Server{})
var _ = Sender(&Server{})
func init() {
logrus.SetLevel(logrus.DebugLevel)
ipfslog.SetDebugLogging()
}
func TestStartDialRelayNode_InvalidMultiaddress(t *testing.T) {
@@ -96,7 +98,35 @@ func TestEmit_OK(t *testing.T) {
}
}
func TestSubscribeToTopic_OK(t *testing.T) {
func TestSubscribeToTopic_onPubSub_OK(t *testing.T) {
ctx, cancel := context.WithTimeout(context.TODO(), 1*time.Second)
defer cancel()
h := bhost.NewBlankHost(swarmt.GenSwarm(t, ctx))
h2 := bhost.NewBlankHost(swarmt.GenSwarm(t, ctx))
gsub, err := pubsub.NewFloodSub(ctx, h2)
if err != nil {
t.Errorf("Failed to create pubsub: %v", err)
}
s := Server{
ctx: ctx,
gsub: gsub,
host: h,
feeds: make(map[reflect.Type]Feed),
mutex: &sync.Mutex{},
topicMapping: make(map[reflect.Type]string),
}
feed := s.Feed(&shardpb.CollationBodyRequest{})
ch := make(chan Message)
sub := feed.Subscribe(ch)
defer sub.Unsubscribe()
testSubscribe(ctx, t, s, gsub, ch)
}
func TestSubscribeToTopic_directMessaging_OK(t *testing.T) {
ctx, cancel := context.WithTimeout(context.TODO(), 1*time.Second)
defer cancel()
h := bhost.NewBlankHost(swarmt.GenSwarm(t, ctx))
@@ -119,16 +149,49 @@ func TestSubscribeToTopic_OK(t *testing.T) {
ch := make(chan Message)
sub := feed.Subscribe(ch)
defer sub.Unsubscribe()
topic := shardpb.Topic_COLLATION_BODY_REQUEST
testSubscribe(ctx, t, s, gsub, ch)
s.RegisterTopic(topic.String(), &shardpb.CollationBodyRequest{})
pbMsg := &shardpb.CollationBodyRequest{ShardId: 5}
done := make(chan bool)
go func() {
// The message should be received from the feed.
msg := <-ch
if !proto.Equal(msg.Data.(proto.Message), pbMsg) {
t.Errorf("Unexpected msg: %+v. Wanted %+v.", msg.Data, pbMsg)
}
done <- true
}()
h2 := bhost.NewBlankHost(swarmt.GenSwarm(t, ctx))
if err := h2.Connect(ctx, pstore.PeerInfo{ID: h.ID(), Addrs: h.Addrs()}); err != nil {
t.Fatal(err)
}
stream, err := h2.NewStream(ctx, h.ID(), protocol.ID(prysmProtocolPrefix+"/"+topic.String()))
if err != nil {
t.Fatal(err)
}
w := ggio.NewDelimitedWriter(stream)
defer w.Close()
w.WriteMsg(pbMsg)
// Wait for our message assertion to complete.
select {
case <-done:
case <-ctx.Done():
t.Error("Context timed out before a message was received!")
}
}
func TestSubscribe_OK(t *testing.T) {
ctx, cancel := context.WithTimeout(context.TODO(), 1*time.Second)
defer cancel()
h := bhost.NewBlankHost(swarmt.GenSwarm(t, ctx))
h2 := bhost.NewBlankHost(swarmt.GenSwarm(t, ctx))
gsub, err := pubsub.NewFloodSub(ctx, h)
gsub, err := pubsub.NewFloodSub(ctx, h2)
if err != nil {
t.Errorf("Failed to create pubsub: %v", err)
}
@@ -199,8 +262,9 @@ func TestRegisterTopic_InvalidProtobufs(t *testing.T) {
ctx, cancel := context.WithTimeout(context.TODO(), 1*time.Second)
defer cancel()
h := bhost.NewBlankHost(swarmt.GenSwarm(t, ctx))
h2 := bhost.NewBlankHost(swarmt.GenSwarm(t, ctx))
gsub, err := pubsub.NewFloodSub(ctx, h)
gsub, err := pubsub.NewFloodSub(ctx, h2)
if err != nil {
t.Errorf("Failed to create floodsub: %v", err)