From 48a12fa6379c789fb5d84d30b427a2a0e147967f Mon Sep 17 00:00:00 2001 From: Dean Eigenmann Date: Tue, 6 Aug 2019 11:18:46 +0200 Subject: [PATCH] feature/subscribe (#62) * readded suubscription code * added tes * changed the way channels are created * closes channel on stop --- mvds_batch_test.go | 21 +++++++++++++++++---- node/node.go | 19 +++++++++++++++++++ 2 files changed, 36 insertions(+), 4 deletions(-) diff --git a/mvds_batch_test.go b/mvds_batch_test.go index a8eadeb..2125b06 100644 --- a/mvds_batch_test.go +++ b/mvds_batch_test.go @@ -1,14 +1,15 @@ package main import ( + "testing" + "time" + "github.com/stretchr/testify/suite" "github.com/vacp2p/mvds/node" "github.com/vacp2p/mvds/peers" "github.com/vacp2p/mvds/state" "github.com/vacp2p/mvds/store" "github.com/vacp2p/mvds/transport" - "testing" - "time" ) func TestMVDSBatchSuite(t *testing.T) { @@ -67,7 +68,10 @@ func (s *MVDSBatchSuite) TearDownTest() { } func (s *MVDSBatchSuite) TestSendClient1ToClient2() { - messageID, err := s.client1.AppendMessage(s.groupID, []byte("message 1")) + subscription := s.client2.Subscribe() + content := []byte("message 1") + + messageID, err := s.client1.AppendMessage(s.groupID, content) s.Require().NoError(err) // Check message is in store @@ -80,10 +84,16 @@ func (s *MVDSBatchSuite) TestSendClient1ToClient2() { message1Receiver, err := s.ds2.Get(messageID) return err == nil && message1Receiver != nil }, 1*time.Second, 10*time.Millisecond) + + message := <- subscription + s.Equal(message.Body, content) } func (s *MVDSBatchSuite) TestSendClient2ToClient1() { - messageID, err := s.client2.AppendMessage(s.groupID, []byte("message 1")) + subscription := s.client1.Subscribe() + content := []byte("message 1") + + messageID, err := s.client2.AppendMessage(s.groupID, content) s.Require().NoError(err) // Check message is in store @@ -96,6 +106,9 @@ func (s *MVDSBatchSuite) TestSendClient2ToClient1() { message1Receiver, err := s.ds1.Get(messageID) return err == nil && message1Receiver != nil }, 1*time.Second, 10*time.Millisecond) + + message := <- subscription + s.Equal(message.Body, content) } func (s *MVDSBatchSuite) TestAcks() { diff --git a/node/node.go b/node/node.go index 765c655..dd2131a 100644 --- a/node/node.go +++ b/node/node.go @@ -48,6 +48,8 @@ type Node struct { epoch int64 mode Mode + + subscription chan protobuf.Message } // NewNode returns a new node. @@ -115,9 +117,21 @@ func (n *Node) Start(duration time.Duration) { // Stop message reading and epoch processing func (n *Node) Stop() { log.Print("Stopping node") + close(n.subscription) n.cancel() } +// Subscribe subscribes to incoming messages. +func (n *Node) Subscribe() chan protobuf.Message { + n.subscription = make(chan protobuf.Message) + return n.subscription +} + +// Unsubscribe closes the listening channels +func (n *Node) Unsubscribe() { + close(n.subscription) +} + // AppendMessage sends a message to a given group. func (n *Node) AppendMessage(groupID state.GroupID, data []byte) (state.MessageID, error) { m := protobuf.Message{ @@ -359,6 +373,7 @@ func (n *Node) onMessage(sender state.PeerID, msg protobuf.Message) error { if err != nil { return err } + for _, peer := range peers { if peer == sender { continue @@ -367,6 +382,10 @@ func (n *Node) onMessage(sender state.PeerID, msg protobuf.Message) error { n.insertSyncState(&groupID, id, peer, state.OFFER) } + if n.subscription != nil { + n.subscription <- msg + } + return nil }