5 Commits

Author SHA1 Message Date
decanus
c6e703eade sorting 2019-05-28 20:59:31 -04:00
decanus
e786de21ff moved all public funcs to top 2019-05-28 20:58:38 -04:00
decanus
d04eae22cf minor cleanup 2019-05-28 20:44:40 -04:00
decanus
9a5ef0aa5d renamed 2019-05-28 19:51:53 -04:00
decanus
337ca88bfd removing tests for now 2019-05-28 19:44:47 -04:00
6 changed files with 96 additions and 208 deletions

24
node.go
View File

@@ -23,7 +23,7 @@ type Node struct {
sharing map[GroupID][]PeerId
peers map[GroupID][]PeerId
payloads Payloads
payloads payloads
nextEpoch calculateNextEpoch
@@ -39,7 +39,7 @@ func NewNode(ms MessageStore, st Transport, nextEpoch calculateNextEpoch, id Pee
syncState: newSyncState(),
sharing: make(map[GroupID][]PeerId),
peers: make(map[GroupID][]PeerId),
payloads: Payloads{payloads: make(map[GroupID]map[PeerId]Payload)},
payloads: payloads{payloads: make(map[GroupID]map[PeerId]Payload)},
nextEpoch: nextEpoch,
ID: id,
epoch: 0,
@@ -125,6 +125,16 @@ func (n *Node) Share(group GroupID, id PeerId) {
n.sharing[group] = append(n.sharing[group], id)
}
func (n Node) IsPeerInGroup(g GroupID, p PeerId) bool {
for _, peer := range n.sharing[g] {
if peer == p {
return true
}
}
return false
}
func (n *Node) sendMessages() {
n.syncState.Map(func(g GroupID, m MessageID, p PeerId, s state) state {
if s.SendEpoch < n.epoch || !n.IsPeerInGroup(g, p) {
@@ -261,16 +271,6 @@ func (n Node) updateSendEpoch(s state) state {
return s
}
func (n Node) IsPeerInGroup(g GroupID, p PeerId) bool {
for _, peer := range n.sharing[g] {
if peer == p {
return true
}
}
return false
}
func toMessageID(b []byte) MessageID {
var id MessageID
copy(id[:], b)

View File

@@ -1,119 +0,0 @@
package mvds
import (
"crypto/ecdsa"
"crypto/elliptic"
"crypto/rand"
"reflect"
"testing"
"time"
)
func TestOnRequest(t *testing.T) {
m := randomMessageId()
g := GroupID{}
r := Request{}
r.Id = append(r.Id, m[:])
n := getNodeForMessageHandlerTest()
p := randomPeerId()
n.onRequest(g, p, r)
if n.syncState.Get(g, m, p).RequestFlag != true {
t.Errorf("did not set Request flag to true")
}
}
func TestOnAck(t *testing.T) {
m := randomMessageId()
g := GroupID{}
a := Ack{}
a.Id = append(a.Id, m[:])
n := getNodeForMessageHandlerTest()
p := randomPeerId()
n.onAck(g, p, a)
if n.syncState.Get(g, m, p).HoldFlag != true {
t.Errorf("did not set Hold flag to true")
}
}
func TestOnOffer(t *testing.T) {
m := randomMessageId()
g := GroupID{}
o := Offer{}
o.Id = append(o.Id, m[:])
n := getNodeForMessageHandlerTest()
p := randomPeerId()
n.onOffer(g, p, o)
if n.syncState.Get(g, m, p).HoldFlag != true {
t.Errorf("did not set Hold flag to true")
}
if n.offeredMessages[g][p][0] != m {
t.Errorf("message was not added to offered list")
}
}
func TestOnMessage(t *testing.T) {
n := getNodeForMessageHandlerTest()
g := GroupID{}
ds := NewDummyStore()
n.store = &ds
id := randomMessageId()
m := Message{
GroupId: id[:],
Timestamp: time.Now().Unix(),
Body: []byte("hello world"),
}
p := randomPeerId()
n.onMessage(g, p, m)
sm, _ := n.store.Get(m.ID())
if !reflect.DeepEqual(sm, m) {
t.Errorf("message was not stored correctly")
}
s := n.state(g, m.ID(), p)
if s.HoldFlag != true || s.AckFlag != true {
t.Errorf("did not set flags")
}
}
func getNodeForMessageHandlerTest() Node {
n := Node{}
n.syncState = syncState{state: make(map[GroupID]map[MessageID]map[PeerId]*State)}
n.offeredMessages = make(map[GroupID]map[PeerId][]MessageID)
n.ID = randomPeerId()
return n
}
func randomMessageId() MessageID {
bytes := make([]byte, 32)
rand.Read(bytes)
id := MessageID{}
copy(id[:], bytes)
return id
}
func randomPeerId() PeerId {
key, _ := ecdsa.GenerateKey(elliptic.P256(), rand.Reader)
return PeerId(key.PublicKey)
}

View File

@@ -4,29 +4,14 @@ import (
"sync"
)
type Payloads struct {
type payloads struct {
sync.Mutex
payloads map[GroupID]map[PeerId]Payload
}
func (p *Payloads) get(id GroupID, peerId PeerId) Payload {
payload, _ := p.payloads[id][peerId]
return payload
}
func (p *Payloads) set(id GroupID, peerId PeerId, payload Payload) {
_, ok := p.payloads[id]
if !ok {
p.payloads[id] = make(map[PeerId]Payload)
}
p.payloads[id][peerId] = payload
}
// @todo check in all the functions below that we aren't duplicating stuff
func (p *Payloads) AddOffers(group GroupID, peer PeerId, offers ...[]byte) {
func (p *payloads) AddOffers(group GroupID, peer PeerId, offers ...[]byte) {
p.Lock()
defer p.Unlock()
@@ -40,7 +25,7 @@ func (p *Payloads) AddOffers(group GroupID, peer PeerId, offers ...[]byte) {
p.set(group, peer, payload)
}
func (p *Payloads) AddAcks(group GroupID, peer PeerId, acks ...[]byte) {
func (p *payloads) AddAcks(group GroupID, peer PeerId, acks ...[]byte) {
p.Lock()
defer p.Unlock()
@@ -54,7 +39,7 @@ func (p *Payloads) AddAcks(group GroupID, peer PeerId, acks ...[]byte) {
p.set(group, peer, payload)
}
func (p *Payloads) AddRequests(group GroupID, peer PeerId, request ...[]byte) {
func (p *payloads) AddRequests(group GroupID, peer PeerId, request ...[]byte) {
p.Lock()
defer p.Unlock()
@@ -68,7 +53,7 @@ func (p *Payloads) AddRequests(group GroupID, peer PeerId, request ...[]byte) {
p.set(group, peer, payload)
}
func (p *Payloads) AddMessages(group GroupID, peer PeerId, messages ...*Message) {
func (p *payloads) AddMessages(group GroupID, peer PeerId, messages ...*Message) {
p.Lock()
defer p.Unlock()
@@ -81,7 +66,7 @@ func (p *Payloads) AddMessages(group GroupID, peer PeerId, messages ...*Message)
p.set(group, peer, payload)
}
func (p *Payloads) Map(f func(GroupID, PeerId, Payload)) {
func (p *payloads) Map(f func(GroupID, PeerId, Payload)) {
p.Lock()
defer p.Unlock()
@@ -94,9 +79,23 @@ func (p *Payloads) Map(f func(GroupID, PeerId, Payload)) {
p.payloads = make(map[GroupID]map[PeerId]Payload)
}
func (p *Payloads) RemoveAll() {
func (p *payloads) RemoveAll() {
p.Lock()
defer p.Unlock()
p.payloads = make(map[GroupID]map[PeerId]Payload)
}
func (p *payloads) get(id GroupID, peerId PeerId) Payload {
payload, _ := p.payloads[id][peerId]
return payload
}
func (p *payloads) set(id GroupID, peerId PeerId, payload Payload) {
_, ok := p.payloads[id]
if !ok {
p.payloads[id] = make(map[PeerId]Payload)
}
p.payloads[id][peerId] = payload
}

View File

@@ -1,63 +1,13 @@
package mvds
import "sync"
type state struct {
SendCount uint64
SendEpoch int64
}
type syncState struct {
sync.Mutex
state map[GroupID]map[MessageID]map[PeerId]state
}
func newSyncState() syncState {
return syncState{
state: make(map[GroupID]map[MessageID]map[PeerId]state),
}
}
func (s syncState) Get(group GroupID, id MessageID, sender PeerId) state {
s.Lock()
defer s.Unlock()
state, _ := s.state[group][id][sender]
return state
}
func (s *syncState) Set(group GroupID, id MessageID, sender PeerId, newState state) {
s.Lock()
defer s.Unlock()
if _, ok := s.state[group]; !ok {
s.state[group] = make(map[MessageID]map[PeerId]state)
}
if _, ok := s.state[group][id]; !ok {
s.state[group][id] = make(map[PeerId]state)
}
s.state[group][id][sender] = newState
}
func (s *syncState) Remove(group GroupID, id MessageID, sender PeerId) {
s.Lock()
defer s.Unlock()
delete(s.state[group][id], sender)
}
func (s *syncState) Map(process func(g GroupID, m MessageID, p PeerId, s state) state) {
s.Lock()
defer s.Unlock()
for group, syncstate := range s.state {
for id, peers := range syncstate {
for peer, state := range peers {
s.state[group][id][peer] = process(group, id, peer, state)
}
}
}
type syncState interface {
Get(group GroupID, id MessageID, sender PeerId) state
Set(group GroupID, id MessageID, sender PeerId, newState state)
Remove(group GroupID, id MessageID, sender PeerId)
Map(process func(g GroupID, m MessageID, p PeerId, s state) state)
}

58
state_memory.go Normal file
View File

@@ -0,0 +1,58 @@
package mvds
import "sync"
type memorySyncState struct {
sync.Mutex
state map[GroupID]map[MessageID]map[PeerId]state
}
func newSyncState() *memorySyncState {
return &memorySyncState{
state: make(map[GroupID]map[MessageID]map[PeerId]state),
}
}
func (s memorySyncState) Get(group GroupID, id MessageID, sender PeerId) state {
s.Lock()
defer s.Unlock()
state, _ := s.state[group][id][sender]
return state
}
func (s *memorySyncState) Set(group GroupID, id MessageID, sender PeerId, newState state) {
s.Lock()
defer s.Unlock()
if _, ok := s.state[group]; !ok {
s.state[group] = make(map[MessageID]map[PeerId]state)
}
if _, ok := s.state[group][id]; !ok {
s.state[group][id] = make(map[PeerId]state)
}
s.state[group][id][sender] = newState
}
func (s *memorySyncState) Remove(group GroupID, id MessageID, sender PeerId) {
s.Lock()
defer s.Unlock()
delete(s.state[group][id], sender)
}
func (s *memorySyncState) Map(process func(g GroupID, m MessageID, p PeerId, s state) state) {
s.Lock()
defer s.Unlock()
for group, syncstate := range s.state {
for id, peers := range syncstate {
for peer, state := range peers {
s.state[group][id][peer] = process(group, id, peer, state)
}
}
}
}