mirror of
https://github.com/vacp2p/mvds.git
synced 2026-01-09 14:47:59 -05:00
enhancement/remove-payloads-function (#20)
* added todos * started working on more responsive code * removing unused * unused * only returning arrays, using map function * added insertation functions * major cleanup * pointer * more changes * minor * removes payloads once theyre sent * we dont need the checking was a bug * no longer needs go routine * not using go routine anymore * minor changes as suggested * atomic * Update README.md
This commit is contained in:
@@ -2,4 +2,6 @@
|
||||
|
||||
:warning: This code is not production ready, race conditions are likely to occur :warning:
|
||||
|
||||
**THIS BRANCH IS BROKEN DUE TO SOME DEADLOCK CAUSED BY NEW PAYLOAD HANDLING**
|
||||
|
||||
Experimental implementation of the [minimal viable data sync protocol specification](https://notes.status.im/O7Xgij1GS3uREKNtzs7Dyw?view).
|
||||
|
||||
2
go.mod
2
go.mod
@@ -7,6 +7,8 @@ require (
|
||||
github.com/ethereum/go-ethereum v1.8.27
|
||||
github.com/golang/protobuf v1.3.1
|
||||
github.com/kr/pretty v0.1.0 // indirect
|
||||
github.com/pkg/errors v0.8.1
|
||||
golang.org/x/crypto v0.0.0-20190513172903-22d7a77e9e5f // indirect
|
||||
golang.org/x/tools v0.0.0-20190525145741-7be61e1b0e51
|
||||
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect
|
||||
)
|
||||
|
||||
6
go.sum
6
go.sum
@@ -30,18 +30,24 @@ github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
|
||||
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
|
||||
github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
|
||||
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
|
||||
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
|
||||
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
||||
golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
|
||||
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
||||
golang.org/x/crypto v0.0.0-20190513172903-22d7a77e9e5f h1:R423Cnkcp5JABoeemiGEPlt9tHXFfw5kvc0yqlxRPWo=
|
||||
golang.org/x/crypto v0.0.0-20190513172903-22d7a77e9e5f/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
|
||||
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
|
||||
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
|
||||
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20190412213103-97732733099d h1:+R4KGOnez64A81RvjARKc4UT5/tI9ujCIVX+P5KiHuI=
|
||||
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
golang.org/x/tools v0.0.0-20190525145741-7be61e1b0e51 h1:RhYYBLDB5MoVkvoNGMNk+DSj7WoGhySvIvtEjTyiP74=
|
||||
golang.org/x/tools v0.0.0-20190525145741-7be61e1b0e51/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
|
||||
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
|
||||
@@ -2,6 +2,8 @@ package mvds
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type MessageStore interface {
|
||||
@@ -30,7 +32,12 @@ func (ds *DummyStore) GetMessage(id MessageID) (Message, error) {
|
||||
ds.Lock()
|
||||
defer ds.Unlock()
|
||||
|
||||
m, _ := ds.ms[id]; return m, nil
|
||||
m, ok := ds.ms[id]
|
||||
if !ok {
|
||||
return Message{}, errors.New("message does not exist")
|
||||
}
|
||||
|
||||
return m, nil
|
||||
}
|
||||
|
||||
func (ds *DummyStore) SaveMessage(message Message) error {
|
||||
|
||||
319
node.go
319
node.go
@@ -6,7 +6,7 @@ import (
|
||||
"crypto/ecdsa"
|
||||
"crypto/elliptic"
|
||||
"log"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/crypto"
|
||||
@@ -15,24 +15,15 @@ import (
|
||||
type calculateNextEpoch func(count uint64, epoch int64) int64
|
||||
type PeerId ecdsa.PublicKey
|
||||
|
||||
type State struct {
|
||||
HoldFlag bool
|
||||
AckFlag bool
|
||||
RequestFlag bool
|
||||
SendCount uint64
|
||||
SendEpoch int64
|
||||
}
|
||||
|
||||
type Node struct {
|
||||
sync.Mutex
|
||||
|
||||
ms MessageStore
|
||||
st Transport
|
||||
|
||||
s syncState
|
||||
offeredMessages map[GroupID]map[PeerId][]MessageID
|
||||
sharing map[GroupID][]PeerId
|
||||
peers map[GroupID][]PeerId
|
||||
s syncState
|
||||
sharing map[GroupID][]PeerId
|
||||
peers map[GroupID][]PeerId
|
||||
|
||||
payloads Payloads
|
||||
|
||||
nextEpoch calculateNextEpoch
|
||||
|
||||
@@ -43,43 +34,37 @@ type Node struct {
|
||||
|
||||
func NewNode(ms MessageStore, st Transport, nextEpoch calculateNextEpoch, id PeerId) *Node {
|
||||
return &Node{
|
||||
ms: ms,
|
||||
st: st,
|
||||
s: syncState{state: make(map[GroupID]map[MessageID]map[PeerId]state)},
|
||||
offeredMessages: make(map[GroupID]map[PeerId][]MessageID),
|
||||
sharing: make(map[GroupID][]PeerId),
|
||||
peers: make(map[GroupID][]PeerId),
|
||||
nextEpoch: nextEpoch,
|
||||
ID: id,
|
||||
epoch: 0,
|
||||
ms: ms,
|
||||
st: st,
|
||||
s: syncState{state: make(map[GroupID]map[MessageID]map[PeerId]state)},
|
||||
sharing: make(map[GroupID][]PeerId),
|
||||
peers: make(map[GroupID][]PeerId),
|
||||
payloads: Payloads{payloads: make(map[GroupID]map[PeerId]Payload)},
|
||||
nextEpoch: nextEpoch,
|
||||
ID: id,
|
||||
epoch: 0,
|
||||
}
|
||||
}
|
||||
|
||||
// Run listens for new messages received by the node and sends out those required every tick.
|
||||
func (n *Node) Run() {
|
||||
|
||||
// @todo start listening to both the send channel and what the transport receives for later handling
|
||||
|
||||
// @todo maybe some waiting?
|
||||
|
||||
|
||||
for {
|
||||
<-time.After(1 * time.Second)
|
||||
|
||||
// @todo should probably do a select statement
|
||||
// @todo this is done very badly
|
||||
go func() {
|
||||
// this will be completely legitimate with new payload handling
|
||||
go func() {
|
||||
for {
|
||||
p := n.st.Watch()
|
||||
if p == nil {
|
||||
return
|
||||
}
|
||||
|
||||
n.onPayload(p.Group, p.Sender, p.Payload)
|
||||
}()
|
||||
}
|
||||
}()
|
||||
|
||||
go n.sendMessages() // @todo probably not that efficient here
|
||||
n.epoch += 1
|
||||
}
|
||||
go func() {
|
||||
for {
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
n.sendMessages()
|
||||
atomic.AddInt64(&n.epoch, 1)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// AppendMessage sends a message to a given group.
|
||||
@@ -106,6 +91,8 @@ func (n *Node) AppendMessage(group GroupID, data []byte) (MessageID, error) {
|
||||
continue
|
||||
}
|
||||
|
||||
// @todo store a sync state only for Offers
|
||||
|
||||
s := n.s.Get(g, id, p)
|
||||
s.SendEpoch = n.epoch + 1
|
||||
n.s.Set(g, id, p, s)
|
||||
@@ -135,194 +122,137 @@ func (n *Node) Share(group GroupID, id PeerId) {
|
||||
}
|
||||
|
||||
func (n *Node) sendMessages() {
|
||||
|
||||
pls := n.payloads()
|
||||
|
||||
for g, payloads := range pls {
|
||||
for id, p := range payloads {
|
||||
|
||||
err := n.st.Send(g, n.ID, id, *p)
|
||||
if err != nil {
|
||||
// @todo
|
||||
}
|
||||
n.s.Map(func(g GroupID, m MessageID, p PeerId, s state) state {
|
||||
if s.SendEpoch < n.epoch || !n.IsPeerInGroup(g, p) {
|
||||
return s
|
||||
}
|
||||
}
|
||||
|
||||
n.payloads.AddOffers(g, p, m[:])
|
||||
return n.updateSendEpoch(s)
|
||||
})
|
||||
|
||||
n.payloads.Map(func(id GroupID, peer PeerId, payload Payload) {
|
||||
err := n.st.Send(id, n.ID, peer, payload)
|
||||
if err != nil {
|
||||
// @todo
|
||||
}
|
||||
})
|
||||
|
||||
n.payloads.RemoveAll()
|
||||
}
|
||||
|
||||
func (n *Node) onPayload(group GroupID, sender PeerId, payload Payload) {
|
||||
n.onAck(group, sender, *payload.Ack)
|
||||
n.onRequest(group, sender, *payload.Request)
|
||||
n.onOffer(group, sender, *payload.Offer)
|
||||
if payload.Ack != nil {
|
||||
n.onAck(group, sender, *payload.Ack)
|
||||
}
|
||||
|
||||
for _, m := range payload.Messages {
|
||||
n.onMessage(group, sender, *m)
|
||||
if payload.Request != nil {
|
||||
n.payloads.AddMessages(group, sender, n.onRequest(group, sender, *payload.Request)...)
|
||||
}
|
||||
|
||||
if payload.Offer != nil {
|
||||
n.payloads.AddRequests(group, sender, n.onOffer(group, sender, *payload.Offer)...)
|
||||
}
|
||||
|
||||
if payload.Messages != nil {
|
||||
n.payloads.AddAcks(group, sender, n.onMessages(group, sender, payload.Messages)...)
|
||||
}
|
||||
}
|
||||
|
||||
func (n *Node) onOffer(group GroupID, sender PeerId, msg Offer) {
|
||||
func (n *Node) onOffer(group GroupID, sender PeerId, msg Offer) [][]byte {
|
||||
r := make([][]byte, 0)
|
||||
|
||||
for _, raw := range msg.Id {
|
||||
id := toMessageID(raw)
|
||||
n.offerMessage(group, sender, id)
|
||||
|
||||
s := n.s.Get(group, id, sender)
|
||||
s.HoldFlag = true
|
||||
n.s.Set(group, id, sender, s)
|
||||
|
||||
log.Printf("[%x] OFFER (%x -> %x): %x received.\n", group[:4], sender.toBytes()[:4], n.ID.toBytes()[:4], id[:4])
|
||||
|
||||
// @todo maybe ack?
|
||||
if n.ms.HasMessage(id) {
|
||||
continue
|
||||
}
|
||||
|
||||
r = append(r, raw)
|
||||
log.Printf("[%x] sending REQUEST (%x -> %x): %x\n", group[:4], n.ID.toBytes()[:4], sender.toBytes()[:4], id[:4])
|
||||
}
|
||||
|
||||
return r
|
||||
}
|
||||
|
||||
func (n *Node) onRequest(group GroupID, sender PeerId, msg Request) {
|
||||
func (n *Node) onRequest(group GroupID, sender PeerId, msg Request) []*Message {
|
||||
m := make([]*Message, 0)
|
||||
|
||||
for _, raw := range msg.Id {
|
||||
id := toMessageID(raw)
|
||||
|
||||
s := n.s.Get(group, id, sender)
|
||||
s.RequestFlag = true
|
||||
n.s.Set(group, id, sender, s)
|
||||
|
||||
log.Printf("[%x] REQUEST (%x -> %x): %x received.\n", group[:4], sender.toBytes()[:4], n.ID.toBytes()[:4], id[:4])
|
||||
|
||||
if !n.IsPeerInGroup(group, sender) {
|
||||
continue
|
||||
}
|
||||
|
||||
message, err := n.ms.GetMessage(id)
|
||||
if err != nil {
|
||||
log.Printf("error requesting message %x", id[:4])
|
||||
continue
|
||||
}
|
||||
|
||||
n.s.Set(group, id, sender, n.updateSendEpoch(n.s.Get(group, id, sender)))
|
||||
|
||||
m = append(m, &message)
|
||||
}
|
||||
|
||||
return m
|
||||
}
|
||||
|
||||
// @todo this should return nothing?
|
||||
func (n *Node) onAck(group GroupID, sender PeerId, msg Ack) {
|
||||
for _, raw := range msg.Id {
|
||||
id := toMessageID(raw)
|
||||
|
||||
s := n.s.Get(group, id, sender)
|
||||
s.HoldFlag = true
|
||||
n.s.Set(group, id, sender, s)
|
||||
n.s.Remove(group, id, sender)
|
||||
|
||||
log.Printf("[%x] ACK (%x -> %x): %x received.\n", group[:4], sender.toBytes()[:4], n.ID.toBytes()[:4], id[:4])
|
||||
}
|
||||
}
|
||||
|
||||
func (n *Node) onMessage(group GroupID, sender PeerId, msg Message) {
|
||||
id := msg.ID()
|
||||
func (n *Node) onMessages(group GroupID, sender PeerId, messages []*Message) [][]byte {
|
||||
a := make([][]byte, 0)
|
||||
|
||||
s := n.s.Get(group, id, sender)
|
||||
s.HoldFlag = true
|
||||
s.AckFlag = true
|
||||
n.s.Set(group, id, sender, s)
|
||||
for _, m := range messages {
|
||||
err := n.onMessage(group, sender, *m)
|
||||
if err != nil {
|
||||
// @todo
|
||||
continue
|
||||
}
|
||||
|
||||
id := m.ID()
|
||||
log.Printf("[%x] sending ACK (%x -> %x): %x\n", group[:4], n.ID.toBytes()[:4], sender.toBytes()[:4], id[:4])
|
||||
a = append(a, id[:])
|
||||
}
|
||||
|
||||
return a
|
||||
}
|
||||
|
||||
// @todo this should return ACKs
|
||||
func (n *Node) onMessage(group GroupID, sender PeerId, msg Message) error {
|
||||
id := msg.ID()
|
||||
log.Printf("[%x] MESSAGE (%x -> %x): %x received.\n", group[:4], sender.toBytes()[:4], n.ID.toBytes()[:4], id[:4])
|
||||
|
||||
// @todo share message with those around us
|
||||
|
||||
err := n.ms.SaveMessage(msg)
|
||||
if err != nil {
|
||||
return err
|
||||
// @todo process, should this function ever even have an error?
|
||||
}
|
||||
|
||||
log.Printf("[%x] MESSAGE (%x -> %x): %x received.\n", group[:4], sender.toBytes()[:4], n.ID.toBytes()[:4], id[:4])
|
||||
|
||||
// @todo push message somewhere for end user
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *Node) payloads() map[GroupID]map[PeerId]*Payload {
|
||||
n.Lock()
|
||||
defer n.Unlock()
|
||||
|
||||
pls := make(map[GroupID]map[PeerId]*Payload)
|
||||
|
||||
// Ack offered Messages
|
||||
o := n.offeredMessages
|
||||
for group, offers := range o {
|
||||
for peer, messages := range offers {
|
||||
// @todo do we need this?
|
||||
if _, ok := pls[group]; !ok {
|
||||
pls[group] = make(map[PeerId]*Payload)
|
||||
}
|
||||
|
||||
if _, ok := pls[group][peer]; !ok {
|
||||
pls[group][peer] = createPayload()
|
||||
}
|
||||
|
||||
for _, id := range messages {
|
||||
// Ack offered Messages
|
||||
if n.ms.HasMessage(id) && n.s.Get(group, id, peer).AckFlag {
|
||||
|
||||
s := n.s.Get(group, id, peer)
|
||||
s.AckFlag = true
|
||||
n.s.Set(group, id, peer, s)
|
||||
|
||||
pls[group][peer].Ack.Id = append(pls[group][peer].Ack.Id, id[:])
|
||||
}
|
||||
|
||||
// Request offered Messages
|
||||
s := n.s.Get(group, id, peer)
|
||||
if !n.ms.HasMessage(id) && s.SendEpoch <= n.epoch {
|
||||
pls[group][peer].Request.Id = append(pls[group][peer].Request.Id, id[:])
|
||||
|
||||
s.HoldFlag = true
|
||||
n.s.Set(group, id, peer, s)
|
||||
|
||||
n.updateSendEpoch(group, id, peer)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
n.s.Map(func(group GroupID, id MessageID, peer PeerId, s state) state {
|
||||
if _, ok := pls[group]; !ok {
|
||||
pls[group] = make(map[PeerId]*Payload)
|
||||
}
|
||||
|
||||
if _, ok := pls[group][peer]; !ok {
|
||||
pls[group][peer] = createPayload()
|
||||
}
|
||||
|
||||
// Ack sent Messages
|
||||
if s.AckFlag {
|
||||
pls[group][peer].Ack.Id = append(pls[group][peer].Ack.Id, id[:])
|
||||
s.AckFlag = false
|
||||
}
|
||||
|
||||
if n.IsPeerInGroup(group, peer) {
|
||||
// Offer Messages
|
||||
if !s.HoldFlag && s.SendEpoch <= n.epoch {
|
||||
pls[group][peer].Offer.Id = append(pls[group][peer].Offer.Id, id[:])
|
||||
s.SendCount += 1
|
||||
s.SendEpoch += n.nextEpoch(s.SendCount, n.epoch)
|
||||
// @todo do we wanna send messages like in interactive mode?
|
||||
}
|
||||
|
||||
// send requested Messages
|
||||
if s.RequestFlag {
|
||||
m, err := n.ms.GetMessage(id)
|
||||
if err != nil {
|
||||
log.Printf("error retreiving message: %s", err)
|
||||
return s
|
||||
}
|
||||
|
||||
pls[group][peer].Messages = append(pls[group][peer].Messages, &m)
|
||||
s.SendCount += 1
|
||||
s.SendEpoch += n.nextEpoch(s.SendCount, n.epoch)
|
||||
s.RequestFlag = false
|
||||
}
|
||||
}
|
||||
|
||||
return s
|
||||
})
|
||||
|
||||
return pls
|
||||
}
|
||||
|
||||
func (n *Node) offerMessage(group GroupID, sender PeerId, id MessageID) {
|
||||
n.Lock()
|
||||
defer n.Unlock()
|
||||
|
||||
if _, ok := n.offeredMessages[group]; !ok {
|
||||
n.offeredMessages[group] = make(map[PeerId][]MessageID)
|
||||
}
|
||||
|
||||
if _, ok := n.offeredMessages[group][sender]; !ok {
|
||||
n.offeredMessages[group][sender] = make([]MessageID, 0)
|
||||
}
|
||||
|
||||
n.offeredMessages[group][sender] = append(n.offeredMessages[group][sender], id)
|
||||
}
|
||||
|
||||
func (n *Node) updateSendEpoch(g GroupID, m MessageID, p PeerId) {
|
||||
s := n.s.Get(g, m, p)
|
||||
func (n Node) updateSendEpoch(s state) state {
|
||||
s.SendCount += 1
|
||||
s.SendEpoch += n.nextEpoch(s.SendCount, n.epoch)
|
||||
n.s.Set(g, m, p, s)
|
||||
return s
|
||||
}
|
||||
|
||||
func (n Node) IsPeerInGroup(g GroupID, p PeerId) bool {
|
||||
@@ -341,15 +271,6 @@ func toMessageID(b []byte) MessageID {
|
||||
return id
|
||||
}
|
||||
|
||||
func createPayload() *Payload {
|
||||
return &Payload{
|
||||
Ack: &Ack{Id: make([][]byte, 0)},
|
||||
Offer: &Offer{Id: make([][]byte, 0)},
|
||||
Request: &Request{Id: make([][]byte, 0)},
|
||||
Messages: make([]*Message, 0),
|
||||
}
|
||||
}
|
||||
|
||||
func (p PeerId) toBytes() []byte {
|
||||
if p.X == nil || p.Y == nil {
|
||||
return nil
|
||||
|
||||
102
payloads.go
Normal file
102
payloads.go
Normal file
@@ -0,0 +1,102 @@
|
||||
package mvds
|
||||
|
||||
import (
|
||||
"sync"
|
||||
)
|
||||
|
||||
type Payloads struct {
|
||||
sync.RWMutex
|
||||
|
||||
payloads map[GroupID]map[PeerId]Payload
|
||||
}
|
||||
|
||||
func (p *Payloads) get(id GroupID, peerId PeerId) Payload {
|
||||
payload, _ := p.payloads[id][peerId]
|
||||
return payload
|
||||
}
|
||||
|
||||
func (p *Payloads) set(id GroupID, peerId PeerId, payload Payload) {
|
||||
_, ok := p.payloads[id]
|
||||
if !ok {
|
||||
p.payloads[id] = make(map[PeerId]Payload)
|
||||
}
|
||||
|
||||
p.payloads[id][peerId] = payload
|
||||
}
|
||||
|
||||
// @todo check in all the functions below that we aren't duplicating stuff
|
||||
|
||||
func (p *Payloads) AddOffers(group GroupID, peer PeerId, offers ...[]byte) {
|
||||
p.Lock()
|
||||
defer p.Unlock()
|
||||
|
||||
payload := p.get(group, peer)
|
||||
if payload.Offer == nil {
|
||||
payload.Offer = &Offer{Id: make([][]byte, 0)}
|
||||
}
|
||||
|
||||
payload.Offer.Id = append(payload.Offer.Id, offers...)
|
||||
|
||||
p.set(group, peer, payload)
|
||||
}
|
||||
|
||||
func (p *Payloads) AddAcks(group GroupID, peer PeerId, acks ...[]byte) {
|
||||
p.Lock()
|
||||
defer p.Unlock()
|
||||
|
||||
payload := p.get(group, peer)
|
||||
if payload.Ack == nil {
|
||||
payload.Ack = &Ack{Id: make([][]byte, 0)}
|
||||
}
|
||||
|
||||
payload.Ack.Id = append(payload.Ack.Id, acks...)
|
||||
|
||||
p.set(group, peer, payload)
|
||||
}
|
||||
|
||||
func (p *Payloads) AddRequests(group GroupID, peer PeerId, request ...[]byte) {
|
||||
p.Lock()
|
||||
defer p.Unlock()
|
||||
|
||||
payload := p.get(group, peer)
|
||||
if payload.Request == nil {
|
||||
payload.Request = &Request{Id: make([][]byte, 0)}
|
||||
}
|
||||
|
||||
payload.Request.Id = append(payload.Request.Id, request...)
|
||||
|
||||
p.set(group, peer, payload)
|
||||
}
|
||||
|
||||
func (p *Payloads) AddMessages(group GroupID, peer PeerId, messages ...*Message) {
|
||||
p.Lock()
|
||||
defer p.Unlock()
|
||||
|
||||
payload := p.get(group, peer)
|
||||
if payload.Messages == nil {
|
||||
payload.Messages = make([]*Message, 0)
|
||||
}
|
||||
|
||||
payload.Messages = append(payload.Messages, messages...)
|
||||
p.set(group, peer, payload)
|
||||
}
|
||||
|
||||
func (p *Payloads) Map(f func(GroupID, PeerId, Payload)) {
|
||||
p.Lock()
|
||||
defer p.Unlock()
|
||||
|
||||
for g, payloads := range p.payloads {
|
||||
for peer, payload := range payloads {
|
||||
f(g, peer, payload)
|
||||
}
|
||||
}
|
||||
|
||||
p.payloads = make(map[GroupID]map[PeerId]Payload)
|
||||
}
|
||||
|
||||
func (p *Payloads) RemoveAll() {
|
||||
//p.Lock()
|
||||
//defer p.Unlock()
|
||||
|
||||
//p.payloads = make(map[GroupID]map[PeerId]Payload)
|
||||
}
|
||||
@@ -19,13 +19,8 @@ type Transport struct {
|
||||
out map[mvds.PeerId]chan<- mvds.Packet
|
||||
}
|
||||
|
||||
func (t *Transport) Watch() *mvds.Packet {
|
||||
select {
|
||||
case p := <- t.in:
|
||||
return &p
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
func (t *Transport) Watch() mvds.Packet {
|
||||
return <- t.in
|
||||
}
|
||||
|
||||
func (t *Transport) Send(group mvds.GroupID, sender mvds.PeerId, peer mvds.PeerId, payload mvds.Payload) error {
|
||||
@@ -84,9 +79,9 @@ func main() {
|
||||
nc.Share(group, na.ID)
|
||||
nc.Share(group, nb.ID)
|
||||
|
||||
go na.Run()
|
||||
go nb.Run()
|
||||
go nc.Run()
|
||||
na.Run()
|
||||
nb.Run()
|
||||
nc.Run()
|
||||
|
||||
chat(group, na, nb)
|
||||
}
|
||||
@@ -98,7 +93,7 @@ func createNode(transport *Transport, id mvds.PeerId) *mvds.Node {
|
||||
|
||||
func chat(group mvds.GroupID, nodes ...*mvds.Node) {
|
||||
for {
|
||||
<-time.After(5 * time.Second)
|
||||
time.Sleep(5 * time.Second)
|
||||
|
||||
for _, n := range nodes {
|
||||
_, err := n.AppendMessage(group, []byte("test"))
|
||||
|
||||
12
state.go
12
state.go
@@ -3,9 +3,6 @@ package mvds
|
||||
import "sync"
|
||||
|
||||
type state struct {
|
||||
HoldFlag bool
|
||||
AckFlag bool
|
||||
RequestFlag bool
|
||||
SendCount uint64
|
||||
SendEpoch int64
|
||||
}
|
||||
@@ -42,11 +39,14 @@ func (s *syncState) Set(group GroupID, id MessageID, sender PeerId, state state)
|
||||
s.state[group][id][sender] = state
|
||||
}
|
||||
|
||||
func (s syncState) Iterate() map[GroupID]map[MessageID]map[PeerId]state {
|
||||
return s.state
|
||||
func (s *syncState) Remove(group GroupID, id MessageID, sender PeerId) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
|
||||
delete(s.state[group][id], sender)
|
||||
}
|
||||
|
||||
func (s syncState) Map(process func(g GroupID, m MessageID, p PeerId, s state) state) {
|
||||
func (s *syncState) Map(process func(g GroupID, m MessageID, p PeerId, s state) state) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
|
||||
|
||||
@@ -7,6 +7,6 @@ type Packet struct {
|
||||
}
|
||||
|
||||
type Transport interface {
|
||||
Watch() *Packet // @todo might need be changed in the future
|
||||
Watch() Packet // @todo might need be changed in the future
|
||||
Send(group GroupID, sender PeerId, peer PeerId, payload Payload) error
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user