feature/subscription (#59)

This commit is contained in:
Dean Eigenmann
2019-07-15 05:32:51 +02:00
committed by GitHub
parent 55cc8fe0f5
commit 27db1bb2c6
2 changed files with 21 additions and 13 deletions

View File

@@ -9,6 +9,7 @@ import (
"time"
"github.com/status-im/mvds/node"
"github.com/status-im/mvds/protobuf"
"github.com/status-im/mvds/state"
"github.com/status-im/mvds/store"
"github.com/status-im/mvds/transport"
@@ -115,6 +116,7 @@ func createNode(transport transport.Transport, id state.PeerID, mode node.Mode)
0,
id,
mode,
make(chan protobuf.Message),
)
}

View File

@@ -37,7 +37,7 @@ type Node struct {
syncState state.SyncState
peers map[state.GroupID][]state.PeerID
peers map[state.GroupID][]state.PeerID
payloads payloads
@@ -47,6 +47,8 @@ type Node struct {
epoch int64
mode Mode
subscription chan<- protobuf.Message
}
// NewNode returns a new node.
@@ -58,21 +60,23 @@ func NewNode(
currentEpoch int64,
id state.PeerID,
mode Mode,
subscription chan<- protobuf.Message,
) *Node {
ctx, cancel := context.WithCancel(context.Background())
return &Node{
ctx: ctx,
cancel: cancel,
store: ms,
transport: st,
syncState: ss,
peers: make(map[state.GroupID][]state.PeerID),
payloads: newPayloads(),
nextEpoch: nextEpoch,
ID: id,
epoch: currentEpoch,
mode: mode,
ctx: ctx,
cancel: cancel,
store: ms,
transport: st,
syncState: ss,
peers: make(map[state.GroupID][]state.PeerID),
payloads: newPayloads(),
nextEpoch: nextEpoch,
ID: id,
epoch: currentEpoch,
mode: mode,
subscription: subscription,
}
}
@@ -325,6 +329,8 @@ func (n *Node) onMessage(group state.GroupID, sender state.PeerID, msg protobuf.
}
}()
n.subscription <- msg
err = n.store.Add(msg)
if err != nil {
return err
@@ -336,7 +342,7 @@ func (n *Node) onMessage(group state.GroupID, sender state.PeerID, msg protobuf.
func (n *Node) insertSyncState(group state.GroupID, id state.MessageID, p state.PeerID, t state.RecordType) {
s := state.State{
Type: t,
Type: t,
SendEpoch: n.epoch + 1,
}