feature/subscribe (#62)

* readded suubscription code

* added tes

* changed the way channels are created

* closes channel on stop
This commit is contained in:
Dean Eigenmann
2019-08-06 11:18:46 +02:00
committed by GitHub
parent 6291da6ee1
commit 48a12fa637
2 changed files with 36 additions and 4 deletions

View File

@@ -1,14 +1,15 @@
package main
import (
"testing"
"time"
"github.com/stretchr/testify/suite"
"github.com/vacp2p/mvds/node"
"github.com/vacp2p/mvds/peers"
"github.com/vacp2p/mvds/state"
"github.com/vacp2p/mvds/store"
"github.com/vacp2p/mvds/transport"
"testing"
"time"
)
func TestMVDSBatchSuite(t *testing.T) {
@@ -67,7 +68,10 @@ func (s *MVDSBatchSuite) TearDownTest() {
}
func (s *MVDSBatchSuite) TestSendClient1ToClient2() {
messageID, err := s.client1.AppendMessage(s.groupID, []byte("message 1"))
subscription := s.client2.Subscribe()
content := []byte("message 1")
messageID, err := s.client1.AppendMessage(s.groupID, content)
s.Require().NoError(err)
// Check message is in store
@@ -80,10 +84,16 @@ func (s *MVDSBatchSuite) TestSendClient1ToClient2() {
message1Receiver, err := s.ds2.Get(messageID)
return err == nil && message1Receiver != nil
}, 1*time.Second, 10*time.Millisecond)
message := <- subscription
s.Equal(message.Body, content)
}
func (s *MVDSBatchSuite) TestSendClient2ToClient1() {
messageID, err := s.client2.AppendMessage(s.groupID, []byte("message 1"))
subscription := s.client1.Subscribe()
content := []byte("message 1")
messageID, err := s.client2.AppendMessage(s.groupID, content)
s.Require().NoError(err)
// Check message is in store
@@ -96,6 +106,9 @@ func (s *MVDSBatchSuite) TestSendClient2ToClient1() {
message1Receiver, err := s.ds1.Get(messageID)
return err == nil && message1Receiver != nil
}, 1*time.Second, 10*time.Millisecond)
message := <- subscription
s.Equal(message.Body, content)
}
func (s *MVDSBatchSuite) TestAcks() {

View File

@@ -48,6 +48,8 @@ type Node struct {
epoch int64
mode Mode
subscription chan protobuf.Message
}
// NewNode returns a new node.
@@ -115,9 +117,21 @@ func (n *Node) Start(duration time.Duration) {
// Stop message reading and epoch processing
func (n *Node) Stop() {
log.Print("Stopping node")
close(n.subscription)
n.cancel()
}
// Subscribe subscribes to incoming messages.
func (n *Node) Subscribe() chan protobuf.Message {
n.subscription = make(chan protobuf.Message)
return n.subscription
}
// Unsubscribe closes the listening channels
func (n *Node) Unsubscribe() {
close(n.subscription)
}
// AppendMessage sends a message to a given group.
func (n *Node) AppendMessage(groupID state.GroupID, data []byte) (state.MessageID, error) {
m := protobuf.Message{
@@ -359,6 +373,7 @@ func (n *Node) onMessage(sender state.PeerID, msg protobuf.Message) error {
if err != nil {
return err
}
for _, peer := range peers {
if peer == sender {
continue
@@ -367,6 +382,10 @@ func (n *Node) onMessage(sender state.PeerID, msg protobuf.Message) error {
n.insertSyncState(&groupID, id, peer, state.OFFER)
}
if n.subscription != nil {
n.subscription <- msg
}
return nil
}