This commit is contained in:
decanus
2019-06-10 12:25:34 -07:00
22 changed files with 671 additions and 583 deletions

7
Makefile Normal file
View File

@@ -0,0 +1,7 @@
SHELL := /bin/bash
GO111MODULE = on
protobuf:
protoc --go_out=. ./protobuf/*.proto
.PHONY: protobuf

View File

@@ -1,19 +1,23 @@
# Minimal Viable Data Sync
:warning: This code is not production ready, race conditions are likely to occur :warning:
Experimental implementation of the [minimal viable data sync protocol specification](https://notes.status.im/bZHk_BNkSAe8-TY7DxdNEg?view).
# Usage
## Usage
## Simulation
### Prerequisites
Ensure you have `protoc` (Protobuf) and Golang installed. Then run `make`.
### Simulation
In order to run a very naive simulation, use the simulation command. The simulation is configurable using various CLI flags.
```
Usage of simulation/simulation.go:
Usage of main.go:
-communicating int
amount of nodes sending messages (default 2)
-interactive int
amount of nodes to use INTERACTIVE mode, the rest will be BATCH (default 3)
-interval int
seconds between messages (default 5)
-nodes int
@@ -24,6 +28,6 @@ Usage of simulation/simulation.go:
amount of nodes each node shares with (default 2)
```
# License
## License
This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details

View File

@@ -12,7 +12,11 @@ import (
"sync"
"time"
"github.com/status-im/mvds"
"github.com/status-im/mvds/node"
"github.com/status-im/mvds/protobuf"
"github.com/status-im/mvds/state"
"github.com/status-im/mvds/store"
"github.com/status-im/mvds/transport"
)
var (
@@ -21,20 +25,21 @@ var (
communicating int
sharing int
interval int64
interactive int
)
type Transport struct {
sync.Mutex
in <-chan mvds.Packet
out map[mvds.PeerId]chan<- mvds.Packet
in <-chan transport.Packet
out map[state.PeerID]chan<- transport.Packet
}
func (t *Transport) Watch() mvds.Packet {
func (t *Transport) Watch() transport.Packet {
return <-t.in
}
func (t *Transport) Send(group mvds.GroupID, sender mvds.PeerId, peer mvds.PeerId, payload mvds.Payload) error {
func (t *Transport) Send(group state.GroupID, sender state.PeerID, peer state.PeerID, payload protobuf.Payload) error {
math.Seed(time.Now().UnixNano())
if math.Intn(100) < offline {
return nil
@@ -45,7 +50,7 @@ func (t *Transport) Send(group mvds.GroupID, sender mvds.PeerId, peer mvds.PeerI
return errors.New("peer unknown")
}
c <- mvds.Packet{Group: group, Sender: sender, Payload: payload}
c <- transport.Packet{Group: group, Sender: sender, Payload: payload}
return nil
}
@@ -55,6 +60,7 @@ func init() {
flag.IntVar(&communicating, "communicating", 2, "amount of nodes sending messages")
flag.IntVar(&sharing, "sharing", 2, "amount of nodes each node shares with")
flag.Int64Var(&interval, "interval", 5, "seconds between messages")
flag.IntVar(&interactive, "interactive", 3, "amount of nodes to use INTERACTIVE mode, the rest will be BATCH") // @todo should probably just be how many nodes are interactive
flag.Parse()
}
@@ -63,19 +69,28 @@ func main() {
// @todo validate flags
transports := make([]*Transport, 0)
input := make([]chan mvds.Packet, 0)
nodes := make([]*mvds.Node, 0)
input := make([]chan transport.Packet, 0)
nodes := make([]*node.Node, 0)
for i := 0; i < nodeCount; i++ {
in := make(chan mvds.Packet)
in := make(chan transport.Packet)
transport := &Transport{
in: in,
out: make(map[mvds.PeerId]chan<- mvds.Packet),
out: make(map[state.PeerID]chan<- transport.Packet),
}
input = append(input, in)
transports = append(transports, transport)
nodes = append(nodes, createNode(transport, peerId()))
mode := node.INTERACTIVE
if i+1 >= interactive {
mode = node.BATCH
}
nodes = append(
nodes,
createNode(transport, peerID(), mode),
)
}
group := groupId()
@@ -89,9 +104,8 @@ func main() {
transports[i].out[peer] = input[p]
n.AddPeer(group, peer)
n.Share(group, peer)
log.Printf("%x sharing with %x", n.ID.ToBytes()[:4], peer.ToBytes()[:4])
log.Printf("%x sharing with %x", n.ID[:4], peer[:4])
}
}
@@ -106,11 +120,7 @@ func selectPeers(nodeCount int, currentNode int, sharing int) []int {
peers := make([]int, 0)
OUTER:
for {
if len(peers) == sharing {
break
}
for len(peers) != sharing {
math.Seed(time.Now().UnixNano())
i := math.Intn(nodeCount)
if i == currentNode {
@@ -129,12 +139,19 @@ OUTER:
return peers
}
func createNode(transport *Transport, id mvds.PeerId) *mvds.Node {
ds := mvds.NewDummyStore()
return mvds.NewNode(&ds, transport, Calc, id)
func createNode(transport *Transport, id state.PeerID, mode node.Mode) *node.Node {
ds := store.NewDummyStore()
return node.NewNode(
&ds,
transport,
state.NewSyncState(),
Calc,
id,
mode,
)
}
func chat(group mvds.GroupID, nodes ...*mvds.Node) {
func chat(group state.GroupID, nodes ...*node.Node) {
for {
time.Sleep(time.Duration(interval) * time.Second)
@@ -151,16 +168,16 @@ func Calc(count uint64, epoch int64) int64 {
return epoch + int64(count*2)
}
func peerId() mvds.PeerId {
func peerID() state.PeerID {
key, _ := ecdsa.GenerateKey(elliptic.P256(), rand.Reader)
return mvds.PeerId(key.PublicKey)
return state.PublicKeyToPeerID(key.PublicKey)
}
func groupId() mvds.GroupID {
func groupId() state.GroupID {
bytes := make([]byte, 32)
rand.Read(bytes)
id := mvds.GroupID{}
id := state.GroupID{}
copy(id[:], bytes)
return id

View File

@@ -1,7 +0,0 @@
package mvds
type MessageStore interface {
Has(id MessageID) bool
Get(id MessageID) (Message, error)
Add(message Message) error
}

View File

@@ -1,41 +0,0 @@
package mvds
import (
"errors"
"sync"
)
type DummyStore struct {
sync.Mutex
ms map[MessageID]Message
}
func NewDummyStore() DummyStore {
return DummyStore{ms: make(map[MessageID]Message)}
}
func (ds *DummyStore) Has(id MessageID) bool {
ds.Lock()
defer ds.Unlock()
_, ok := ds.ms[id]; return ok
}
func (ds *DummyStore) Get(id MessageID) (Message, error) {
ds.Lock()
defer ds.Unlock()
m, ok := ds.ms[id]
if !ok {
return Message{}, errors.New("message does not exist")
}
return m, nil
}
func (ds *DummyStore) Add(message Message) error {
ds.Lock()
defer ds.Unlock()
ds.ms[message.ID()] = message
return nil
}

273
node.go
View File

@@ -1,273 +0,0 @@
package mvds
// @todo this is a very rough implementation that needs cleanup
import (
"fmt"
"log"
"sync/atomic"
"time"
)
type calculateNextEpoch func(count uint64, epoch int64) int64
type Node struct {
store MessageStore
transport Transport
syncState syncState
sharing map[GroupID][]PeerId
peers map[GroupID][]PeerId
payloads payloads
nextEpoch calculateNextEpoch
ID PeerId
epoch int64
}
func NewNode(ms MessageStore, st Transport, nextEpoch calculateNextEpoch, id PeerId) *Node {
return &Node{
store: ms,
transport: st,
syncState: newSyncState(),
sharing: make(map[GroupID][]PeerId),
peers: make(map[GroupID][]PeerId),
payloads: payloads{payloads: make(map[GroupID]map[PeerId]Payload)},
nextEpoch: nextEpoch,
ID: id,
epoch: 0,
}
}
// Run listens for new messages received by the node and sends out those required every tick.
func (n *Node) Run() {
// this will be completely legitimate with new payload handling
go func() {
for {
p := n.transport.Watch()
go n.onPayload(p.Group, p.Sender, p.Payload)
}
}()
go func() {
for {
log.Printf("Node: %x Epoch: %d", n.ID.ToBytes()[:4], n.epoch)
time.Sleep(1 * time.Second)
n.sendMessages()
atomic.AddInt64(&n.epoch, 1)
}
}()
}
// AppendMessage sends a message to a given group.
func (n *Node) AppendMessage(group GroupID, data []byte) (MessageID, error) {
m := Message{
GroupId: group[:],
Timestamp: time.Now().Unix(),
Body: data,
}
id := m.ID()
peers, ok := n.peers[group]
if !ok {
return MessageID{}, fmt.Errorf("trying to send to unknown group %x", group[:4])
}
err := n.store.Add(m)
if err != nil {
return MessageID{}, err
}
go func () {
for _, p := range peers {
if !n.IsPeerInGroup(group, p) {
continue
}
s := state{}
s.SendEpoch = n.epoch + 1
n.syncState.Set(group, id, p, s)
}
}()
log.Printf("[%x] node %x sending %x\n", group[:4], n.ID.ToBytes()[:4], id[:4])
// @todo think about a way to insta trigger send messages when send was selected, we don't wanna wait for ticks here
return id, nil
}
// AddPeer adds a peer to a specific group making it a recipient of messages
func (n *Node) AddPeer(group GroupID, id PeerId) {
if _, ok := n.peers[group]; !ok {
n.peers[group] = make([]PeerId, 0)
}
n.peers[group] = append(n.peers[group], id)
}
func (n *Node) Share(group GroupID, id PeerId) {
if _, ok := n.sharing[group]; !ok {
n.sharing[group] = make([]PeerId, 0)
}
n.sharing[group] = append(n.sharing[group], id)
}
func (n Node) IsPeerInGroup(g GroupID, p PeerId) bool {
for _, peer := range n.sharing[g] {
if peer == p {
return true
}
}
return false
}
func (n *Node) sendMessages() {
n.syncState.Map(func(g GroupID, m MessageID, p PeerId, s state) state {
if s.SendEpoch < n.epoch || !n.IsPeerInGroup(g, p) {
return s
}
n.payloads.AddOffers(g, p, m[:])
return n.updateSendEpoch(s)
})
n.payloads.Map(func(id GroupID, peer PeerId, payload Payload) {
err := n.transport.Send(id, n.ID, peer, payload)
if err != nil {
log.Printf("error sending message: %s", err.Error())
// @todo
}
})
n.payloads.RemoveAll()
}
func (n *Node) onPayload(group GroupID, sender PeerId, payload Payload) {
if payload.Ack != nil {
n.onAck(group, sender, *payload.Ack)
}
if payload.Request != nil {
n.payloads.AddMessages(group, sender, n.onRequest(group, sender, *payload.Request)...)
}
if payload.Messages != nil {
n.payloads.AddAcks(group, sender, n.onMessages(group, sender, payload.Messages)...)
}
if payload.Offer != nil {
requests, acks := n.onOffer(group, sender, *payload.Offer)
n.payloads.AddAcks(group, sender, acks...)
n.payloads.AddRequests(group, sender, requests...)
}
}
func (n *Node) onOffer(group GroupID, sender PeerId, msg Offer) ([][]byte, [][]byte) {
requests := make([][]byte, 0)
acks := make([][]byte, 0)
for _, raw := range msg.Id {
id := toMessageID(raw)
log.Printf("[%x] OFFER (%x -> %x): %x received.\n", group[:4], sender.ToBytes()[:4], n.ID.ToBytes()[:4], id[:4])
if n.store.Has(id) {
acks = append(acks, raw)
continue
}
requests = append(requests, raw)
log.Printf("[%x] sending REQUEST (%x -> %x): %x\n", group[:4], n.ID.ToBytes()[:4], sender.ToBytes()[:4], id[:4])
}
return requests, acks
}
func (n *Node) onRequest(group GroupID, sender PeerId, msg Request) []*Message {
m := make([]*Message, 0)
for _, raw := range msg.Id {
id := toMessageID(raw)
log.Printf("[%x] REQUEST (%x -> %x): %x received.\n", group[:4], sender.ToBytes()[:4], n.ID.ToBytes()[:4], id[:4])
if !n.IsPeerInGroup(group, sender) {
continue
}
message, err := n.store.Get(id)
if err != nil {
log.Printf("error requesting message %x", id[:4])
continue
}
n.syncState.Set(group, id, sender, n.updateSendEpoch(n.syncState.Get(group, id, sender)))
m = append(m, &message)
log.Printf("[%x] sending MESSAGE (%x -> %x): %x\n", group[:4], n.ID.ToBytes()[:4], sender.ToBytes()[:4], id[:4])
}
return m
}
func (n *Node) onAck(group GroupID, sender PeerId, msg Ack) {
for _, raw := range msg.Id {
id := toMessageID(raw)
n.syncState.Remove(group, id, sender)
log.Printf("[%x] ACK (%x -> %x): %x received.\n", group[:4], sender.ToBytes()[:4], n.ID.ToBytes()[:4], id[:4])
}
}
func (n *Node) onMessages(group GroupID, sender PeerId, messages []*Message) [][]byte {
a := make([][]byte, 0)
for _, m := range messages {
err := n.onMessage(group, sender, *m)
if err != nil {
// @todo
continue
}
id := m.ID()
log.Printf("[%x] sending ACK (%x -> %x): %x\n", group[:4], n.ID.ToBytes()[:4], sender.ToBytes()[:4], id[:4])
a = append(a, id[:])
}
return a
}
func (n *Node) onMessage(group GroupID, sender PeerId, msg Message) error {
id := msg.ID()
log.Printf("[%x] MESSAGE (%x -> %x): %x received.\n", group[:4], sender.ToBytes()[:4], n.ID.ToBytes()[:4], id[:4])
// @todo share message with those around us
err := n.store.Add(msg)
if err != nil {
return err
// @todo process, should this function ever even have an error?
}
return nil
}
func (n Node) updateSendEpoch(s state) state {
s.SendCount += 1
s.SendEpoch += n.nextEpoch(s.SendCount, n.epoch)
return s
}
func toMessageID(b []byte) MessageID {
var id MessageID
copy(id[:], b)
return id
}

309
node/node.go Normal file
View File

@@ -0,0 +1,309 @@
package node
// @todo this is a very rough implementation that needs cleanup
import (
"bytes"
"fmt"
"log"
"sync/atomic"
"time"
"github.com/status-im/mvds/protobuf"
"github.com/status-im/mvds/state"
"github.com/status-im/mvds/store"
"github.com/status-im/mvds/transport"
)
type Mode string
const (
INTERACTIVE Mode = "interactive"
BATCH Mode = "batch"
)
type calculateNextEpoch func(count uint64, epoch int64) int64
type Node struct {
store store.MessageStore
transport transport.Transport
syncState state.SyncState
peers map[state.GroupID][]state.PeerID
payloads payloads
nextEpoch calculateNextEpoch
ID state.PeerID
epoch int64
mode Mode
}
func NewNode(ms store.MessageStore, st transport.Transport, ss state.SyncState, nextEpoch calculateNextEpoch, id state.PeerID, mode Mode) *Node {
return &Node{
store: ms,
transport: st,
syncState: ss,
peers: make(map[state.GroupID][]state.PeerID),
payloads: newPayloads(),
nextEpoch: nextEpoch,
ID: id,
epoch: 0,
mode: mode,
}
}
// Run listens for new messages received by the node and sends out those required every epoch.
func (n *Node) Run() {
go func() {
for {
p := n.transport.Watch()
go n.onPayload(p.Group, p.Sender, p.Payload)
}
}()
go func() {
for {
log.Printf("Node: %x Epoch: %d", n.ID[:4], n.epoch)
time.Sleep(1 * time.Second)
n.sendMessages()
atomic.AddInt64(&n.epoch, 1)
}
}()
}
// AppendMessage sends a message to a given group.
func (n *Node) AppendMessage(group state.GroupID, data []byte) (state.MessageID, error) {
m := protobuf.Message{
GroupId: group[:],
Timestamp: time.Now().Unix(),
Body: data,
}
id := state.ID(m)
peers, ok := n.peers[group]
if !ok {
return state.MessageID{}, fmt.Errorf("trying to send to unknown group %x", group[:4])
}
err := n.store.Add(m)
if err != nil {
return state.MessageID{}, err
}
go func() {
for _, p := range peers {
if !n.IsPeerInGroup(group, p) {
continue
}
if n.mode == INTERACTIVE {
s := state.State{}
s.SendEpoch = n.epoch + 1
err := n.syncState.Set(group, id, p, s)
if err != nil {
log.Printf("error while setting sync state %s", err.Error())
}
return
}
if n.mode == BATCH {
n.payloads.AddMessages(group, p, &m)
log.Printf("[%x] sending MESSAGE (%x -> %x): %x\n", group[:4], n.ID[:4], p[:4], id[:4])
}
}
}()
log.Printf("[%x] node %x sending %x\n", group[:4], n.ID[:4], id[:4])
// @todo think about a way to insta trigger send messages when send was selected, we don't wanna wait for ticks here
return id, nil
}
// AddPeer adds a peer to a specific group making it a recipient of messages
func (n *Node) AddPeer(group state.GroupID, id state.PeerID) {
if _, ok := n.peers[group]; !ok {
n.peers[group] = make([]state.PeerID, 0)
}
n.peers[group] = append(n.peers[group], id)
}
func (n Node) IsPeerInGroup(g state.GroupID, p state.PeerID) bool {
for _, peer := range n.peers[g] {
if bytes.Equal(peer[:], p[:]) {
return true
}
}
return false
}
func (n *Node) sendMessages() {
err := n.syncState.Map(func(g state.GroupID, m state.MessageID, p state.PeerID, s state.State) state.State {
if s.SendEpoch < n.epoch || !n.IsPeerInGroup(g, p) {
return s
}
n.payloads.AddOffers(g, p, m[:])
return n.updateSendEpoch(s)
})
if err != nil {
log.Printf("error while mapping sync state: %s", err.Error())
}
n.payloads.MapAndClear(func(id state.GroupID, peer state.PeerID, payload protobuf.Payload) {
err := n.transport.Send(id, n.ID, peer, payload)
if err != nil {
log.Printf("error sending message: %s", err.Error())
// @todo
}
})
}
func (n *Node) onPayload(group state.GroupID, sender state.PeerID, payload protobuf.Payload) {
if payload.Ack != nil {
n.onAck(group, sender, *payload.Ack)
}
if payload.Request != nil {
n.payloads.AddMessages(group, sender, n.onRequest(group, sender, *payload.Request)...)
}
if payload.Offer != nil {
requests, acks := n.onOffer(group, sender, *payload.Offer)
n.payloads.AddAcks(group, sender, acks...)
n.payloads.AddRequests(group, sender, requests...)
}
if payload.Messages != nil {
n.payloads.AddAcks(group, sender, n.onMessages(group, sender, payload.Messages)...)
}
}
func (n *Node) onOffer(group state.GroupID, sender state.PeerID, msg protobuf.Offer) ([][]byte, [][]byte) {
requests := make([][]byte, 0)
acks := make([][]byte, 0)
for _, raw := range msg.Id {
id := toMessageID(raw)
log.Printf("[%x] OFFER (%x -> %x): %x received.\n", group[:4], sender[:4], n.ID[:4], id[:4])
if n.store.Has(id) {
acks = append(acks, raw)
continue
}
requests = append(requests, raw)
log.Printf("[%x] sending REQUEST (%x -> %x): %x\n", group[:4], n.ID[:4], sender[:4], id[:4])
}
return requests, acks
}
func (n *Node) onRequest(group state.GroupID, sender state.PeerID, msg protobuf.Request) []*protobuf.Message {
m := make([]*protobuf.Message, 0)
for _, raw := range msg.Id {
id := toMessageID(raw)
log.Printf("[%x] REQUEST (%x -> %x): %x received.\n", group[:4], sender[:4], n.ID[:4], id[:4])
if !n.IsPeerInGroup(group, sender) {
log.Printf("[%x] peer %x is not in group", group[:4], sender[:4])
continue
}
message, err := n.store.Get(id)
if err != nil {
log.Printf("error requesting message %x", id[:4])
continue
}
// @todo this probably change the sync state to retransmit messages rather than offers
s, err := n.syncState.Get(group, id, sender)
if err != nil {
log.Printf("error (%s) getting sync state group: %x id: %x peer: %x", err.Error(), group[:4], id[:4], sender[:4])
continue
}
err = n.syncState.Set(group, id, sender, n.updateSendEpoch(s))
if err != nil {
log.Printf("error (%s) setting sync state group: %x id: %x peer: %x", err.Error(), group[:4], id[:4], sender[:4])
continue
}
m = append(m, &message)
log.Printf("[%x] sending MESSAGE (%x -> %x): %x\n", group[:4], n.ID[:4], sender[:4], id[:4])
}
return m
}
func (n *Node) onAck(group state.GroupID, sender state.PeerID, msg protobuf.Ack) {
for _, raw := range msg.Id {
id := toMessageID(raw)
err := n.syncState.Remove(group, id, sender)
if err != nil {
log.Printf("error while removing sync state %s", err.Error())
continue
}
log.Printf("[%x] ACK (%x -> %x): %x received.\n", group[:4], sender[:4], n.ID[:4], id[:4])
}
}
func (n *Node) onMessages(group state.GroupID, sender state.PeerID, messages []*protobuf.Message) [][]byte {
a := make([][]byte, 0)
for _, m := range messages {
err := n.onMessage(group, sender, *m)
if err != nil {
// @todo
continue
}
id := state.ID(*m)
log.Printf("[%x] sending ACK (%x -> %x): %x\n", group[:4], n.ID[:4], sender[:4], id[:4])
a = append(a, id[:])
}
return a
}
func (n *Node) onMessage(group state.GroupID, sender state.PeerID, msg protobuf.Message) error {
id := state.ID(msg)
log.Printf("[%x] MESSAGE (%x -> %x): %x received.\n", group[:4], sender[:4], n.ID[:4], id[:4])
// @todo share message with those around us
err := n.store.Add(msg)
if err != nil {
return err
// @todo process, should this function ever even have an error?
}
return nil
}
func (n Node) updateSendEpoch(s state.State) state.State {
s.SendCount += 1
s.SendEpoch += n.nextEpoch(s.SendCount, n.epoch)
return s
}
func toMessageID(b []byte) state.MessageID {
var id state.MessageID
copy(id[:], b)
return id
}

103
node/payloads.go Normal file
View File

@@ -0,0 +1,103 @@
package node
import (
"sync"
"github.com/status-im/mvds/protobuf"
"github.com/status-im/mvds/state"
)
type payloads struct {
sync.Mutex
payloads map[state.GroupID]map[state.PeerID]protobuf.Payload
}
// @todo check in all the functions below that we aren't duplicating stuff
func newPayloads() payloads {
return payloads{
payloads: make(map[state.GroupID]map[state.PeerID]protobuf.Payload),
}
}
func (p *payloads) AddOffers(group state.GroupID, peer state.PeerID, offers ...[]byte) {
p.Lock()
defer p.Unlock()
payload := p.get(group, peer)
if payload.Offer == nil {
payload.Offer = &protobuf.Offer{Id: make([][]byte, 0)}
}
payload.Offer.Id = append(payload.Offer.Id, offers...)
p.set(group, peer, payload)
}
func (p *payloads) AddAcks(group state.GroupID, peer state.PeerID, acks ...[]byte) {
p.Lock()
defer p.Unlock()
payload := p.get(group, peer)
if payload.Ack == nil {
payload.Ack = &protobuf.Ack{Id: make([][]byte, 0)}
}
payload.Ack.Id = append(payload.Ack.Id, acks...)
p.set(group, peer, payload)
}
func (p *payloads) AddRequests(group state.GroupID, peer state.PeerID, request ...[]byte) {
p.Lock()
defer p.Unlock()
payload := p.get(group, peer)
if payload.Request == nil {
payload.Request = &protobuf.Request{Id: make([][]byte, 0)}
}
payload.Request.Id = append(payload.Request.Id, request...)
p.set(group, peer, payload)
}
func (p *payloads) AddMessages(group state.GroupID, peer state.PeerID, messages ...*protobuf.Message) {
p.Lock()
defer p.Unlock()
payload := p.get(group, peer)
if payload.Messages == nil {
payload.Messages = make([]*protobuf.Message, 0)
}
payload.Messages = append(payload.Messages, messages...)
p.set(group, peer, payload)
}
func (p *payloads) MapAndClear(f func(state.GroupID, state.PeerID, protobuf.Payload)) {
p.Lock()
defer p.Unlock()
for g, payloads := range p.payloads {
for peer, payload := range payloads {
f(g, peer, payload)
}
}
p.payloads = make(map[state.GroupID]map[state.PeerID]protobuf.Payload)
}
func (p *payloads) get(id state.GroupID, peer state.PeerID) protobuf.Payload {
payload, _ := p.payloads[id][peer]
return payload
}
func (p *payloads) set(id state.GroupID, peer state.PeerID, payload protobuf.Payload) {
_, ok := p.payloads[id]
if !ok {
p.payloads[id] = make(map[state.PeerID]protobuf.Payload)
}
p.payloads[id][peer] = payload
}

View File

@@ -1,101 +0,0 @@
package mvds
import (
"sync"
)
type payloads struct {
sync.Mutex
payloads map[GroupID]map[PeerId]Payload
}
// @todo check in all the functions below that we aren't duplicating stuff
func (p *payloads) AddOffers(group GroupID, peer PeerId, offers ...[]byte) {
p.Lock()
defer p.Unlock()
payload := p.get(group, peer)
if payload.Offer == nil {
payload.Offer = &Offer{Id: make([][]byte, 0)}
}
payload.Offer.Id = append(payload.Offer.Id, offers...)
p.set(group, peer, payload)
}
func (p *payloads) AddAcks(group GroupID, peer PeerId, acks ...[]byte) {
p.Lock()
defer p.Unlock()
payload := p.get(group, peer)
if payload.Ack == nil {
payload.Ack = &Ack{Id: make([][]byte, 0)}
}
payload.Ack.Id = append(payload.Ack.Id, acks...)
p.set(group, peer, payload)
}
func (p *payloads) AddRequests(group GroupID, peer PeerId, request ...[]byte) {
p.Lock()
defer p.Unlock()
payload := p.get(group, peer)
if payload.Request == nil {
payload.Request = &Request{Id: make([][]byte, 0)}
}
payload.Request.Id = append(payload.Request.Id, request...)
p.set(group, peer, payload)
}
func (p *payloads) AddMessages(group GroupID, peer PeerId, messages ...*Message) {
p.Lock()
defer p.Unlock()
payload := p.get(group, peer)
if payload.Messages == nil {
payload.Messages = make([]*Message, 0)
}
payload.Messages = append(payload.Messages, messages...)
p.set(group, peer, payload)
}
func (p *payloads) Map(f func(GroupID, PeerId, Payload)) {
p.Lock()
defer p.Unlock()
for g, payloads := range p.payloads {
for peer, payload := range payloads {
f(g, peer, payload)
}
}
p.payloads = make(map[GroupID]map[PeerId]Payload)
}
func (p *payloads) RemoveAll() {
p.Lock()
defer p.Unlock()
p.payloads = make(map[GroupID]map[PeerId]Payload)
}
func (p *payloads) get(id GroupID, peerId PeerId) Payload {
payload, _ := p.payloads[id][peerId]
return payload
}
func (p *payloads) set(id GroupID, peerId PeerId, payload Payload) {
_, ok := p.payloads[id]
if !ok {
p.payloads[id] = make(map[PeerId]Payload)
}
p.payloads[id][peerId] = payload
}

View File

@@ -1,18 +0,0 @@
package mvds
import (
"crypto/ecdsa"
"crypto/elliptic"
"github.com/ethereum/go-ethereum/crypto"
)
type PeerId ecdsa.PublicKey
func (p PeerId) ToBytes() []byte {
if p.X == nil || p.Y == nil {
return nil
}
return elliptic.Marshal(crypto.S256(), p.X, p.Y)
}

View File

@@ -1,7 +1,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: sync.proto
// source: protobuf/sync.proto
package mvds
package protobuf
import (
fmt "fmt"
@@ -34,7 +34,7 @@ func (m *Payload) Reset() { *m = Payload{} }
func (m *Payload) String() string { return proto.CompactTextString(m) }
func (*Payload) ProtoMessage() {}
func (*Payload) Descriptor() ([]byte, []int) {
return fileDescriptor_5273b98214de8075, []int{0}
return fileDescriptor_2dca527c092c79d7, []int{0}
}
func (m *Payload) XXX_Unmarshal(b []byte) error {
@@ -94,7 +94,7 @@ func (m *Ack) Reset() { *m = Ack{} }
func (m *Ack) String() string { return proto.CompactTextString(m) }
func (*Ack) ProtoMessage() {}
func (*Ack) Descriptor() ([]byte, []int) {
return fileDescriptor_5273b98214de8075, []int{1}
return fileDescriptor_2dca527c092c79d7, []int{1}
}
func (m *Ack) XXX_Unmarshal(b []byte) error {
@@ -135,7 +135,7 @@ func (m *Message) Reset() { *m = Message{} }
func (m *Message) String() string { return proto.CompactTextString(m) }
func (*Message) ProtoMessage() {}
func (*Message) Descriptor() ([]byte, []int) {
return fileDescriptor_5273b98214de8075, []int{2}
return fileDescriptor_2dca527c092c79d7, []int{2}
}
func (m *Message) XXX_Unmarshal(b []byte) error {
@@ -188,7 +188,7 @@ func (m *Offer) Reset() { *m = Offer{} }
func (m *Offer) String() string { return proto.CompactTextString(m) }
func (*Offer) ProtoMessage() {}
func (*Offer) Descriptor() ([]byte, []int) {
return fileDescriptor_5273b98214de8075, []int{3}
return fileDescriptor_2dca527c092c79d7, []int{3}
}
func (m *Offer) XXX_Unmarshal(b []byte) error {
@@ -227,7 +227,7 @@ func (m *Request) Reset() { *m = Request{} }
func (m *Request) String() string { return proto.CompactTextString(m) }
func (*Request) ProtoMessage() {}
func (*Request) Descriptor() ([]byte, []int) {
return fileDescriptor_5273b98214de8075, []int{4}
return fileDescriptor_2dca527c092c79d7, []int{4}
}
func (m *Request) XXX_Unmarshal(b []byte) error {
@@ -263,24 +263,25 @@ func init() {
proto.RegisterType((*Request)(nil), "mvds.Request")
}
func init() { proto.RegisterFile("sync.proto", fileDescriptor_5273b98214de8075) }
func init() { proto.RegisterFile("protobuf/sync.proto", fileDescriptor_2dca527c092c79d7) }
var fileDescriptor_5273b98214de8075 = []byte{
// 244 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x64, 0x90, 0xb1, 0x4e, 0xc3, 0x30,
0x10, 0x40, 0x95, 0x38, 0xc5, 0xed, 0x35, 0x30, 0x9c, 0x84, 0x70, 0x05, 0x43, 0xc8, 0x42, 0x58,
0x32, 0xc0, 0x17, 0x74, 0x64, 0x40, 0xa0, 0x1b, 0x58, 0x91, 0x1b, 0xbb, 0x55, 0x14, 0x82, 0x43,
0x9c, 0x22, 0xe5, 0x63, 0xf8, 0x57, 0x94, 0x73, 0xa1, 0x12, 0x6c, 0xf6, 0x7b, 0x4f, 0xf2, 0x9d,
0x01, 0xfc, 0xf8, 0x5e, 0x95, 0x5d, 0xef, 0x06, 0x87, 0x49, 0xfb, 0x69, 0x7c, 0xfe, 0x15, 0x81,
0x7c, 0xd6, 0xe3, 0x9b, 0xd3, 0x06, 0x2f, 0x41, 0xe8, 0xaa, 0x51, 0x51, 0x16, 0x15, 0xcb, 0xbb,
0x45, 0x39, 0xf9, 0x72, 0x5d, 0x35, 0x34, 0x51, 0xbc, 0x86, 0x99, 0xdb, 0x6e, 0x6d, 0xaf, 0x62,
0xd6, 0xcb, 0xa0, 0x9f, 0x26, 0x44, 0xc1, 0xe0, 0x0d, 0xc8, 0xde, 0x7e, 0xec, 0xad, 0x1f, 0x94,
0xe0, 0xe8, 0x34, 0x44, 0x14, 0x20, 0xfd, 0x58, 0xbc, 0x85, 0x79, 0x6b, 0xbd, 0xd7, 0x3b, 0xeb,
0x55, 0x92, 0x89, 0x63, 0xf9, 0x18, 0x28, 0xfd, 0xea, 0xfc, 0x1c, 0xc4, 0xba, 0x6a, 0xf0, 0x0c,
0xe2, 0xda, 0xa8, 0x28, 0x13, 0x45, 0x4a, 0x71, 0x6d, 0xf2, 0x17, 0x90, 0x87, 0x16, 0x57, 0x30,
0xdf, 0xf5, 0x6e, 0xdf, 0xbd, 0x72, 0x10, 0x15, 0x29, 0x49, 0xbe, 0x3f, 0x18, 0xbc, 0x82, 0xc5,
0x50, 0xb7, 0xd6, 0x0f, 0xba, 0xed, 0x78, 0x6e, 0x41, 0x47, 0x80, 0x08, 0xc9, 0xc6, 0x99, 0x91,
0x67, 0x4d, 0x89, 0xcf, 0xf9, 0x05, 0xcc, 0x78, 0xa5, 0x7f, 0x0f, 0xae, 0x40, 0x1e, 0xd6, 0xf8,
0xab, 0x36, 0x27, 0xfc, 0x9f, 0xf7, 0xdf, 0x01, 0x00, 0x00, 0xff, 0xff, 0x9a, 0xc1, 0x4e, 0x78,
0x5d, 0x01, 0x00, 0x00,
var fileDescriptor_2dca527c092c79d7 = []byte{
// 258 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x64, 0x90, 0x41, 0x4f, 0xb4, 0x30,
0x10, 0x86, 0x03, 0x65, 0x3f, 0xd8, 0x59, 0x3e, 0x0f, 0x63, 0x8c, 0xdd, 0xe8, 0x01, 0xb9, 0x88,
0x17, 0x4c, 0xf4, 0x17, 0xac, 0x37, 0x0f, 0x46, 0xd3, 0x83, 0x07, 0x2f, 0xa6, 0xd0, 0xb2, 0x21,
0x88, 0x45, 0x0a, 0x26, 0xfc, 0x18, 0xff, 0xab, 0x61, 0xba, 0xeb, 0x26, 0x7a, 0x9b, 0x79, 0x9f,
0x27, 0xe9, 0xbc, 0x85, 0xe3, 0xae, 0x37, 0x83, 0x29, 0xc6, 0xea, 0xda, 0x4e, 0xef, 0x65, 0x4e,
0x1b, 0x06, 0xed, 0xa7, 0xb2, 0xe9, 0x97, 0x07, 0xe1, 0x93, 0x9c, 0xde, 0x8c, 0x54, 0x78, 0x06,
0x4c, 0x96, 0x0d, 0xf7, 0x12, 0x2f, 0x5b, 0xdd, 0x2c, 0xf3, 0x99, 0xe7, 0x9b, 0xb2, 0x11, 0x73,
0x8a, 0x17, 0xb0, 0x30, 0x55, 0xa5, 0x7b, 0xee, 0x13, 0x5e, 0x39, 0xfc, 0x38, 0x47, 0xc2, 0x11,
0xbc, 0x84, 0xb0, 0xd7, 0x1f, 0xa3, 0xb6, 0x03, 0x67, 0x24, 0xfd, 0x77, 0x92, 0x70, 0xa1, 0xd8,
0x53, 0xbc, 0x82, 0xa8, 0xd5, 0xd6, 0xca, 0xad, 0xb6, 0x3c, 0x48, 0xd8, 0xc1, 0x7c, 0x70, 0xa9,
0xf8, 0xc1, 0xe9, 0x09, 0xb0, 0x4d, 0xd9, 0xe0, 0x11, 0xf8, 0xb5, 0xe2, 0x5e, 0xc2, 0xb2, 0x58,
0xf8, 0xb5, 0x4a, 0x9f, 0x21, 0xdc, 0xb9, 0xb8, 0x86, 0x68, 0xdb, 0x9b, 0xb1, 0x7b, 0x25, 0xc1,
0xcb, 0x62, 0x11, 0xd2, 0x7e, 0xaf, 0xf0, 0x1c, 0x96, 0x43, 0xdd, 0x6a, 0x3b, 0xc8, 0xb6, 0xa3,
0xbb, 0x99, 0x38, 0x04, 0x88, 0x10, 0x14, 0x46, 0x4d, 0x74, 0x6b, 0x2c, 0x68, 0x4e, 0x4f, 0x61,
0x41, 0x95, 0xfe, 0x3c, 0xb8, 0x86, 0x70, 0x57, 0xe3, 0x37, 0xba, 0x83, 0x97, 0x68, 0xff, 0xbf,
0xc5, 0x3f, 0x9a, 0x6e, 0xbf, 0x03, 0x00, 0x00, 0xff, 0xff, 0xd9, 0xf0, 0x22, 0x6e, 0x72, 0x01,
0x00, 0x00,
}

View File

@@ -1,6 +1,7 @@
syntax = "proto3";
package mvds;
option go_package = "protobuf";
message Payload {
Ack ack = 1;

View File

@@ -1,13 +0,0 @@
package mvds
type state struct {
SendCount uint64
SendEpoch int64
}
type syncState interface {
Get(group GroupID, id MessageID, sender PeerId) state
Set(group GroupID, id MessageID, sender PeerId, newState state)
Remove(group GroupID, id MessageID, sender PeerId)
Map(process func(g GroupID, m MessageID, p PeerId, s state) state)
}

16
state/peerid.go Normal file
View File

@@ -0,0 +1,16 @@
package state
import (
"crypto/ecdsa"
"github.com/ethereum/go-ethereum/crypto"
)
type PeerID [64]byte
// Turns an ECSDA PublicKey to a PeerID
func PublicKeyToPeerID(k ecdsa.PublicKey) PeerID {
var p PeerID
copy(p[:], crypto.FromECDSAPub(&k))
return p
}

13
state/state.go Normal file
View File

@@ -0,0 +1,13 @@
package state
type State struct {
SendCount uint64
SendEpoch int64
}
type SyncState interface {
Get(group GroupID, id MessageID, peer PeerID) (State, error)
Set(group GroupID, id MessageID, peer PeerID, newState State) error
Remove(group GroupID, id MessageID, peer PeerID) error
Map(process func(GroupID, MessageID, PeerID, State) State) error
}

64
state/state_memory.go Normal file
View File

@@ -0,0 +1,64 @@
package state
import (
"sync"
)
type memorySyncState struct {
sync.Mutex
state map[GroupID]map[MessageID]map[PeerID]State
}
func NewSyncState() *memorySyncState {
return &memorySyncState{
state: make(map[GroupID]map[MessageID]map[PeerID]State),
}
}
func (s *memorySyncState) Get(group GroupID, id MessageID, peer PeerID) (State, error) {
s.Lock()
defer s.Unlock()
state, _ := s.state[group][id][peer]
return state, nil
}
func (s *memorySyncState) Set(group GroupID, id MessageID, peer PeerID, newState State) error {
s.Lock()
defer s.Unlock()
if _, ok := s.state[group]; !ok {
s.state[group] = make(map[MessageID]map[PeerID]State)
}
if _, ok := s.state[group][id]; !ok {
s.state[group][id] = make(map[PeerID]State)
}
s.state[group][id][peer] = newState
return nil
}
func (s *memorySyncState) Remove(group GroupID, id MessageID, peer PeerID) error {
s.Lock()
defer s.Unlock()
delete(s.state[group][id], peer)
return nil
}
func (s *memorySyncState) Map(process func(GroupID, MessageID, PeerID, State) State) error {
s.Lock()
defer s.Unlock()
for group, syncstate := range s.state {
for id, peers := range syncstate {
for peer, state := range peers {
s.state[group][id][peer] = process(group, id, peer, state)
}
}
}
return nil
}

View File

@@ -1,14 +1,17 @@
package mvds
package state
import (
"crypto/sha256"
"encoding/binary"
"github.com/status-im/mvds/protobuf"
)
type MessageID [32]byte
type GroupID [32]byte
func (m Message) ID() MessageID {
// ID creates the MessageID for a Message
func ID(m protobuf.Message) MessageID {
t := make([]byte, 8)
binary.LittleEndian.PutUint64(t, uint64(m.Timestamp))

View File

@@ -1,58 +0,0 @@
package mvds
import "sync"
type memorySyncState struct {
sync.Mutex
state map[GroupID]map[MessageID]map[PeerId]state
}
func newSyncState() *memorySyncState {
return &memorySyncState{
state: make(map[GroupID]map[MessageID]map[PeerId]state),
}
}
func (s memorySyncState) Get(group GroupID, id MessageID, sender PeerId) state {
s.Lock()
defer s.Unlock()
state, _ := s.state[group][id][sender]
return state
}
func (s *memorySyncState) Set(group GroupID, id MessageID, sender PeerId, newState state) {
s.Lock()
defer s.Unlock()
if _, ok := s.state[group]; !ok {
s.state[group] = make(map[MessageID]map[PeerId]state)
}
if _, ok := s.state[group][id]; !ok {
s.state[group][id] = make(map[PeerId]state)
}
s.state[group][id][sender] = newState
}
func (s *memorySyncState) Remove(group GroupID, id MessageID, sender PeerId) {
s.Lock()
defer s.Unlock()
delete(s.state[group][id], sender)
}
func (s *memorySyncState) Map(process func(g GroupID, m MessageID, p PeerId, s state) state) {
s.Lock()
defer s.Unlock()
for group, syncstate := range s.state {
for id, peers := range syncstate {
for peer, state := range peers {
s.state[group][id][peer] = process(group, id, peer, state)
}
}
}
}

12
store/messagestore.go Normal file
View File

@@ -0,0 +1,12 @@
package store
import (
"github.com/status-im/mvds/protobuf"
"github.com/status-im/mvds/state"
)
type MessageStore interface {
Has(id state.MessageID) bool
Get(id state.MessageID) (protobuf.Message, error)
Add(message protobuf.Message) error
}

View File

@@ -0,0 +1,44 @@
package store
import (
"errors"
"sync"
"github.com/status-im/mvds/protobuf"
"github.com/status-im/mvds/state"
)
type DummyStore struct {
sync.Mutex
ms map[state.MessageID]protobuf.Message
}
func NewDummyStore() DummyStore {
return DummyStore{ms: make(map[state.MessageID]protobuf.Message)}
}
func (ds *DummyStore) Has(id state.MessageID) bool {
ds.Lock()
defer ds.Unlock()
_, ok := ds.ms[id]; return ok
}
func (ds *DummyStore) Get(id state.MessageID) (protobuf.Message, error) {
ds.Lock()
defer ds.Unlock()
m, ok := ds.ms[id]
if !ok {
return protobuf.Message{}, errors.New("message does not exist")
}
return m, nil
}
func (ds *DummyStore) Add(message protobuf.Message) error {
ds.Lock()
defer ds.Unlock()
ds.ms[state.ID(message)] = message
return nil
}

View File

@@ -1,12 +0,0 @@
package mvds
type Packet struct {
Group GroupID
Sender PeerId
Payload Payload
}
type Transport interface {
Watch() Packet // @todo might need be changed in the future
Send(group GroupID, sender PeerId, peer PeerId, payload Payload) error
}

17
transport/transport.go Normal file
View File

@@ -0,0 +1,17 @@
package transport
import (
"github.com/status-im/mvds/protobuf"
"github.com/status-im/mvds/state"
)
type Packet struct {
Group state.GroupID
Sender state.PeerID
Payload protobuf.Payload
}
type Transport interface {
Watch() Packet // @todo might need be changed in the future
Send(group state.GroupID, sender state.PeerID, peer state.PeerID, payload protobuf.Payload) error
}