Ignore Messages From Local Peer (#3299)

* validate message coming into pipeline

* gaz

* add to deprecated p2p

* add new lib

* change lib
This commit is contained in:
Nishant Das
2019-08-25 00:11:24 +05:30
committed by Preston Van Loon
parent 122166b317
commit 919877f301
7 changed files with 52 additions and 0 deletions

View File

@@ -48,6 +48,7 @@ type PubSubProvider interface {
// PeerManager abstracts some peer management methods from libp2p.
type PeerManager interface {
Disconnect(peer.ID) error
PeerID() peer.ID
}
// HandshakeManager abstracts certain methods regarding handshake records.

View File

@@ -153,6 +153,11 @@ func (s *Service) SetStreamHandler(topic string, handler network.StreamHandler)
s.host.SetStreamHandler(protocol.ID(topic), handler)
}
// PeerID returns the Peer ID of the local peer.
func (s *Service) PeerID() peer.ID {
return s.host.ID()
}
// Disconnect from a peer.
func (s *Service) Disconnect(pid peer.ID) error {
return s.host.Network().ClosePeer(pid)

View File

@@ -139,6 +139,11 @@ func (p *TestP2P) AddHandshake(pid peer.ID, hello *pb.Hello) {
// TODO(3147): add this.
}
// PeerID returns the Peer ID of the local peer.
func (p *TestP2P) PeerID() peer.ID {
return p.Host.ID()
}
// Send a message to a specific peer.
func (p *TestP2P) Send(ctx context.Context, msg proto.Message, pid peer.ID) (network.Stream, error) {
return nil, nil

View File

@@ -90,5 +90,6 @@ go_test(
"@com_github_libp2p_go_libp2p_core//network:go_default_library",
"@com_github_libp2p_go_libp2p_core//protocol:go_default_library",
"@com_github_prysmaticlabs_go_ssz//:go_default_library",
"@com_github_sirupsen_logrus//hooks/test:go_default_library",
],
)

View File

@@ -128,6 +128,10 @@ func (r *RegularSync) subscribe(topic string, validate validator, handle subHand
return
}
if msg.GetFrom() == r.p2p.PeerID() {
continue
}
go pipeline(msg.Data)
}
}

View File

@@ -1,7 +1,9 @@
package sync
import (
"bytes"
"context"
"errors"
"sync"
"testing"
"time"
@@ -10,6 +12,7 @@ import (
p2ptest "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing"
pb "github.com/prysmaticlabs/prysm/proto/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/shared/testutil"
logTest "github.com/sirupsen/logrus/hooks/test"
)
func TestSubscribe_ReceivesValidMessage(t *testing.T) {
@@ -61,3 +64,31 @@ func TestSubscribe_HandlesPanic(t *testing.T) {
t.Fatal("Did not receive PubSub in 1 second")
}
}
func TestSubscribe_IgnoreMessageFromSelf(t *testing.T) {
hook := logTest.NewGlobal()
p2p := p2ptest.NewTestP2P(t)
r := RegularSync{
ctx: context.Background(),
p2p: p2p,
}
topic := "/eth2/voluntary_exit"
errorMsg := "Message entered into pipeline despite coming from same peer"
r.subscribe(topic, noopValidator, func(_ context.Context, msg proto.Message) error {
return errors.New(errorMsg)
})
buf := new(bytes.Buffer)
msg := &pb.VoluntaryExit{Epoch: 55}
if _, err := p2p.Encoding().Encode(buf, msg); err != nil {
t.Fatalf("Failed to encode message: %v", err)
}
if err := p2p.PubSub().Publish(topic+p2p.Encoding().ProtocolSuffix(), buf.Bytes()); err != nil {
t.Fatalf("Failed to publish message; %v", err)
}
time.Sleep(100 * time.Millisecond)
testutil.AssertLogsDoNotContain(t, hook, errorMsg)
}

View File

@@ -425,6 +425,11 @@ func (s *Server) Subscribe(msg proto.Message, channel chan Message) event.Subscr
return s.Feed(msg).Subscribe(channel)
}
// PeerID returns the local peer.
func (s *Server) PeerID() peer.ID {
return s.host.ID()
}
// Send a message to a specific peer. If the peerID is set to p2p.AnyPeer, then
// this method will act as a broadcast.
func (s *Server) Send(ctx context.Context, msg proto.Message, peerID peer.ID) (network.Stream, error) {