Files
mvds/main.go
Dean Eigenmann 748b61123f feature/mdf (#76)
* protobuf

* do not send ack when not required

* remove from state if no ack required

* different send modes

* fix

* updated

* retransmit fix

* updated

* fixed

* renamed to ephemeral

* repeated

* gen

* resolution

* cleanup

* rough dependency code

* todo

* removed

* only stores if ephemeral

* started implementing the algo

* simplified

* updated

* we never store ephemeral messages so we did not need

* adds parents

* new schema

* tx to insert

* err

* removed old

* fixed

* changed log

* test persistence of parents

* removed

* rename

* ignoring

* Update store/messagestore_sqlite.go

Co-Authored-By: Adam Babik <adam@status.im>

* Update node/node.go

Co-Authored-By: Adam Babik <adam@status.im>

* Update node/node.go

Co-Authored-By: Adam Babik <adam@status.im>

* Update node/node.go

Co-Authored-By: Adam Babik <adam@status.im>

* more fixes

* Update store/messagestore_sqlite.go

Co-Authored-By: Adam Babik <adam@status.im>

* more fixes

* using refs

* Update node/node.go

Co-Authored-By: Adam Babik <adam@status.im>

* finished

* Update store/messagestore_sqlite.go

Co-Authored-By: Adam Babik <adam@status.im>

* Update 1572372377_initial_schema.down.sql

* desc + refactor

* started refactoring resolution

* Update README.md

* rewrote resolve

* mutex

* todo

* fixes

* sql impl

* added test

* log

* updates

* updated

* little bug

* fix

* added test

* first changes from @adambabik

* moved

* fixed test, started eventual ones

* fixed eventually test

* mock install

* consistent test

* mock

* fix lint

* Update dependency/tracker_sqlite.go

Co-Authored-By: Adam Babik <adam@status.im>

* fix
2019-11-05 17:32:23 +01:00

168 lines
3.3 KiB
Go

package main
import (
"crypto/rand"
"flag"
"fmt"
"log"
math "math/rand"
"time"
"github.com/vacp2p/mvds/dependency"
"github.com/vacp2p/mvds/node"
"github.com/vacp2p/mvds/peers"
"github.com/vacp2p/mvds/state"
"github.com/vacp2p/mvds/store"
"github.com/vacp2p/mvds/transport"
"go.uber.org/zap"
)
var (
offline int
nodeCount int
communicating int
sharing int
interval int64
interactive int
)
func parseFlags() {
flag.IntVar(&offline, "offline", 90, "percentage of time a node is offline")
flag.IntVar(&nodeCount, "nodes", 3, "amount of nodes")
flag.IntVar(&communicating, "communicating", 2, "amount of nodes sending messages")
flag.IntVar(&sharing, "sharing", 2, "amount of nodes each node shares with")
flag.Int64Var(&interval, "interval", 5, "seconds between messages")
flag.IntVar(&interactive, "interactive", 3, "amount of nodes to use InteractiveMode mode, the rest will be BatchMode") // @todo should probably just be how many nodes are interactive
flag.Parse()
}
func main() {
parseFlags()
// @todo validate flags
transports := make([]*transport.ChannelTransport, 0)
input := make([]chan transport.Packet, 0)
nodes := make([]*node.Node, 0)
for i := 0; i < nodeCount; i++ {
in := make(chan transport.Packet)
t := transport.NewChannelTransport(offline, in)
input = append(input, in)
transports = append(transports, t)
mode := node.InteractiveMode
if i+1 >= interactive {
mode = node.BatchMode
}
node, err := createNode(t, peerID(), mode)
if err != nil {
log.Printf("Could not create node: %+v\n", err)
}
nodes = append(
nodes,
node,
)
}
group := groupId()
// @todo add multiple groups, only one or so nodes in every group so there is overlap
// @todo maybe dms?
for i, n := range nodes {
peers := selectPeers(len(nodes), i, sharing)
for _, p := range peers {
peer := nodes[p].ID
transports[i].AddOutput(peer, input[p])
_ = n.AddPeer(group, peer)
log.Printf("%x sharing with %x", n.ID[:4], peer[:4])
}
}
for _, n := range nodes {
n.Start(1 * time.Second)
}
chat(group, nodes[:communicating-1]...)
}
func selectPeers(nodeCount int, currentNode int, sharing int) []int {
peers := make([]int, 0)
OUTER:
for len(peers) != sharing {
math.Seed(time.Now().UnixNano())
i := math.Intn(nodeCount)
if i == currentNode {
continue
}
for _, p := range peers {
if i == p {
continue OUTER
}
}
peers = append(peers, i)
}
return peers
}
func createNode(transport transport.Transport, id state.PeerID, mode node.Mode) (*node.Node, error) {
ds := store.NewMemoryMessageStore()
logger, err := zap.NewDevelopment()
if err != nil {
return nil, err
}
return node.NewNode(
ds,
transport,
state.NewMemorySyncState(),
Calc,
0,
id,
mode,
peers.NewMemoryPersistence(),
dependency.NewInMemoryTracker(),
node.EventualMode,
logger,
), nil
}
func chat(group state.GroupID, nodes ...*node.Node) {
for {
time.Sleep(time.Duration(interval) * time.Second)
for _, n := range nodes {
_, err := n.AppendMessage(group, []byte(fmt.Sprintf("%x testing", n.ID)))
if err != nil {
fmt.Println(err)
}
}
}
}
func Calc(count uint64, epoch int64) int64 {
return epoch + int64(count*2)
}
func peerID() (id state.PeerID) {
_, _ = rand.Read(id[:])
return
}
func groupId() (id state.GroupID) {
_, _ = rand.Read(id[:])
return id
}