split channel

This commit is contained in:
decanus
2019-06-13 13:52:07 -04:00
parent fe80ea7ad0
commit d1ed6f5fd2
2 changed files with 56 additions and 36 deletions

40
main.go
View File

@@ -4,16 +4,13 @@ import (
"crypto/ecdsa"
"crypto/elliptic"
"crypto/rand"
"errors"
"flag"
"fmt"
"log"
math "math/rand"
"sync"
"time"
"github.com/status-im/mvds/node"
"github.com/status-im/mvds/protobuf"
"github.com/status-im/mvds/state"
"github.com/status-im/mvds/store"
"github.com/status-im/mvds/transport"
@@ -28,32 +25,6 @@ var (
interactive int
)
type Transport struct {
sync.Mutex
in <-chan transport.Packet
out map[state.PeerID]chan<- transport.Packet
}
func (t *Transport) Watch() transport.Packet {
return <-t.in
}
func (t *Transport) Send(group state.GroupID, sender state.PeerID, peer state.PeerID, payload protobuf.Payload) error {
math.Seed(time.Now().UnixNano())
if math.Intn(100) < offline {
return nil
}
c, ok := t.out[peer]
if !ok {
return errors.New("peer unknown")
}
c <- transport.Packet{Group: group, Sender: sender, Payload: payload}
return nil
}
func init() {
flag.IntVar(&offline, "offline", 90, "percentage of time a node is offline")
flag.IntVar(&nodeCount, "nodes", 3, "amount of nodes")
@@ -68,16 +39,13 @@ func main() {
// @todo validate flags
transports := make([]*Transport, 0)
transports := make([]*transport.ChannelTransport, 0)
input := make([]chan transport.Packet, 0)
nodes := make([]*node.Node, 0)
for i := 0; i < nodeCount; i++ {
in := make(chan transport.Packet)
t := &Transport{
in: in,
out: make(map[state.PeerID]chan<- transport.Packet),
}
t := transport.NewChannelTransport(offline, in)
input = append(input, in)
transports = append(transports, t)
@@ -102,7 +70,7 @@ func main() {
for _, p := range peers {
peer := nodes[p].ID
transports[i].out[peer] = input[p]
transports[i].AddOutput(peer, input[p])
n.AddPeer(group, peer)
log.Printf("%x sharing with %x", n.ID[:4], peer[:4])
@@ -139,7 +107,7 @@ OUTER:
return peers
}
func createNode(transport *Transport, id state.PeerID, mode node.Mode) *node.Node {
func createNode(transport transport.Transport, id state.PeerID, mode node.Mode) *node.Node {
ds := store.NewDummyStore()
return node.NewNode(
&ds,

View File

@@ -0,0 +1,52 @@
package transport
import (
"errors"
math "math/rand"
"sync"
"time"
"github.com/status-im/mvds/protobuf"
"github.com/status-im/mvds/state"
)
// ChannelTransport implements a basic MVDS transport using channels for basic testing purposes.
type ChannelTransport struct {
sync.Mutex
offline int
in <-chan Packet
out map[state.PeerID]chan<- Packet
}
func NewChannelTransport(offline int, in <-chan Packet) *ChannelTransport {
return &ChannelTransport{
offline: offline,
in: in,
out: make(map[state.PeerID]chan<- Packet),
}
}
func (t *ChannelTransport) AddOutput(id state.PeerID, c chan<-Packet) {
t.out[id] = c
}
func (t *ChannelTransport) Watch() Packet {
return <-t.in
}
func (t *ChannelTransport) Send(group state.GroupID, sender state.PeerID, peer state.PeerID, payload protobuf.Payload) error {
math.Seed(time.Now().UnixNano())
if math.Intn(100) < t.offline {
return nil
}
c, ok := t.out[peer]
if !ok {
return errors.New("peer unknown")
}
c <- Packet{Group: group, Sender: sender, Payload: payload}
return nil
}